Skip to content

Commit

Permalink
branch-3.0: [fix](mtmv) Fix data wrong when query with table operator…
Browse files Browse the repository at this point in the history
… such as TABLESAMPLE or tablet and so on #43030 (#44873)

Cherry-picked from #43030

Co-authored-by: seawinde <[email protected]>
  • Loading branch information
github-actions[bot] and seawinde authored Dec 6, 2024
1 parent 4f78c0c commit 4952a7c
Show file tree
Hide file tree
Showing 14 changed files with 482 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public static ConnectContext createMTMVContext(MTMV mtmv) {
ctx.setCurrentUserIdentity(UserIdentity.ADMIN);
ctx.getState().reset();
ctx.setThreadLocalInfo();
// Debug session variable should be disabled when refreshed
ctx.getSessionVariable().skipDeletePredicate = false;
ctx.getSessionVariable().skipDeleteBitmap = false;
ctx.getSessionVariable().skipDeleteSign = false;
ctx.getSessionVariable().skipStorageEngineMerge = false;
ctx.getSessionVariable().showHiddenColumns = false;
ctx.getSessionVariable().allowModifyMaterializedViewData = true;
// Disable add default limit rule to avoid refresh data wrong
ctx.getSessionVariable().setDisableNereidsRules(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ private LogicalPlan makeOlapScan(TableIf table, UnboundRelation unboundRelation,
unboundRelation.getTableSample());
}
}
if (!tabletIds.isEmpty()) {
// This tabletIds is set manually, so need to set specifiedTabletIds
scan = scan.withManuallySpecifiedTabletIds(tabletIds);
}
if (needGenerateLogicalAggForRandomDistAggTable(scan)) {
// it's a random distribution agg table
// add agg on olap scan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public void initMaterializationContext(CascadesContext cascadesContext) {
* @param cascadesContext current cascadesContext in the planner
*/
protected void doInitMaterializationContext(CascadesContext cascadesContext) {
if (cascadesContext.getConnectContext().getSessionVariable().isInDebugMode()) {
LOG.info(String.format("MaterializationContext init return because is in debug mode, current queryId is %s",
cascadesContext.getConnectContext().getQueryIdentifier()));
return;
}
// Only collect the table or mv which query use directly, to avoid useless mv partition in rewrite
TableCollectorContext collectorContext = new TableCollectorContext(Sets.newHashSet(), false);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,15 @@ public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResu
planner.getCascadesContext().getMemo().getRoot().getStatistics(), null);
}

private static final class TableQueryOperatorChecker extends DefaultPlanVisitor<Boolean, Void> {
/**
* Check the query if Contains query operator
* Such sql as following should return true
* select * from orders TABLET(10098) because TABLET(10098) should return true
* select * from orders_partition PARTITION (day_2) because PARTITION (day_2)
* select * from orders index query_index_test because index query_index_test
* select * from orders TABLESAMPLE(20 percent) because TABLESAMPLE(20 percent)
* */
public static final class TableQueryOperatorChecker extends DefaultPlanVisitor<Boolean, Void> {
public static final TableQueryOperatorChecker INSTANCE = new TableQueryOperatorChecker();

@Override
Expand All @@ -358,12 +366,20 @@ public Boolean visitLogicalRelation(LogicalRelation relation, Void context) {
if (relation instanceof LogicalOlapScan) {
LogicalOlapScan logicalOlapScan = (LogicalOlapScan) relation;
if (logicalOlapScan.getTableSample().isPresent()) {
// Contain sample, select * from orders TABLESAMPLE(20 percent)
return true;
}
if (!logicalOlapScan.getSelectedTabletIds().isEmpty()) {
if (!logicalOlapScan.getManuallySpecifiedTabletIds().isEmpty()) {
// Contain tablets, select * from orders TABLET(10098) because TABLET(10098)
return true;
}
if (!logicalOlapScan.getManuallySpecifiedPartitions().isEmpty()) {
// Contain specified partitions, select * from orders_partition PARTITION (day_2)
return true;
}
if (logicalOlapScan.getSelectedIndexId() != logicalOlapScan.getTable().getBaseIndexId()) {
// Contains select index or use sync mv in rbo rewrite
// select * from orders index query_index_test
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.nereids.jobs.joinorder.hypergraph.node.StructInfoNode;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.TableQueryOperatorChecker;
import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate;
import org.apache.doris.nereids.trees.copier.DeepCopierContext;
import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier;
Expand All @@ -36,6 +37,7 @@
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.ObjectId;
Expand Down Expand Up @@ -323,6 +325,11 @@ public static StructInfo of(Plan originalPlan, @Nullable Plan topPlan, @Nullable
cascadesContext);
valid = valid
&& hyperGraph.getNodes().stream().allMatch(n -> ((StructInfoNode) n).getExpressions() != null);
// if relationList has any relation which contains table operator,
// such as query with sample, index, table, is invalid
boolean invalid = relationList.stream().anyMatch(relation ->
((AbstractPlan) relation).accept(TableQueryOperatorChecker.INSTANCE, null));
valid = valid && !invalid;
// collect predicate from top plan which not in hyper graph
Set<Expression> topPlanPredicates = new LinkedHashSet<>();
topPlan.accept(PREDICATE_COLLECTOR, topPlanPredicates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@ public Rule build() {
return logicalOlapScan().thenApply(ctx -> {
LogicalOlapScan scan = ctx.root;
OlapTable table = scan.getTable();
List<Long> ids = table.selectNonEmptyPartitionIds(scan.getSelectedPartitionIds());
List<Long> partitionIdsToPrune = scan.getSelectedPartitionIds();
List<Long> ids = table.selectNonEmptyPartitionIds(partitionIdsToPrune);
if (ids.isEmpty()) {
return new LogicalEmptyRelation(ConnectContext.get().getStatementContext().getNextRelationId(),
scan.getOutput());
}
if (partitionIdsToPrune.equals(ids)) {
// Not Prune actually, return directly
return null;
}
return scan.withSelectedPartitionIds(ids);
}).toRule(RuleType.PRUNE_EMPTY_PARTITION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public Rule build() {
LogicalOlapScan olapScan = filter.child();
OlapTable table = olapScan.getTable();
Builder<Long> selectedTabletIdsBuilder = ImmutableList.builder();
if (olapScan.getSelectedTabletIds().isEmpty()) {
if (olapScan.getManuallySpecifiedTabletIds().isEmpty()) {
for (Long id : olapScan.getSelectedPartitionIds()) {
Partition partition = table.getPartition(id);
MaterializedIndex index = partition.getIndex(olapScan.getSelectedIndexId());
Expand All @@ -64,10 +64,10 @@ public Rule build() {
partition.getDistributionInfo()));
}
} else {
selectedTabletIdsBuilder.addAll(olapScan.getSelectedTabletIds());
selectedTabletIdsBuilder.addAll(olapScan.getManuallySpecifiedTabletIds());
}
List<Long> selectedTabletIds = selectedTabletIdsBuilder.build();
if (new HashSet<>(selectedTabletIds).equals(new HashSet<>(olapScan.getSelectedTabletIds()))) {
if (new HashSet<>(selectedTabletIds).equals(new HashSet<>(olapScan.getManuallySpecifiedTabletIds()))) {
return null;
}
return filter.withChildren(olapScan.withSelectedTabletIds(selectedTabletIds));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ public void analyze(ConnectContext ctx) throws Exception {
if (!InternalCatalog.INTERNAL_CATALOG_NAME.equals(mvName.getCtl())) {
throw new AnalysisException("Only support creating asynchronous materialized views in internal catalog");
}
if (ctx.getSessionVariable().isInDebugMode()) {
throw new AnalysisException("Create materialized view fail, because is in debug mode");
}
try {
FeNameFormat.checkTableName(mvName.getTbl());
} catch (org.apache.doris.common.AnalysisException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ public class LogicalOlapScan extends LogicalCatalogRelation implements OlapScan
*/
private final List<Long> selectedTabletIds;

/**
* Selected tablet ids to read data from, this would be set if user query with tablets manually
* Such as select * from orders TABLET(100);
*/
private final List<Long> manuallySpecifiedTabletIds;

///////////////////////////////////////////////////////////////////////////
// Members for partition ids.
///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -127,20 +133,24 @@ public LogicalOlapScan(RelationId id, OlapTable table) {
this(id, table, ImmutableList.of());
}

/**
* LogicalOlapScan construct method
*/
public LogicalOlapScan(RelationId id, OlapTable table, List<String> qualifier) {
this(id, table, qualifier, Optional.empty(), Optional.empty(),
table.getPartitionIds(), false,
ImmutableList.of(),
-1, false, PreAggStatus.unset(), ImmutableList.of(), ImmutableList.of(),
Maps.newHashMap(), Optional.empty(), false, ImmutableMap.of());
Maps.newHashMap(), Optional.empty(), false, ImmutableMap.of(),
ImmutableList.of());
}

public LogicalOlapScan(RelationId id, OlapTable table, List<String> qualifier, List<Long> tabletIds,
List<String> hints, Optional<TableSample> tableSample) {
this(id, table, qualifier, Optional.empty(), Optional.empty(),
table.getPartitionIds(), false, tabletIds,
-1, false, PreAggStatus.unset(), ImmutableList.of(), hints, Maps.newHashMap(),
tableSample, false, ImmutableMap.of());
tableSample, false, ImmutableMap.of(), ImmutableList.of());
}

public LogicalOlapScan(RelationId id, OlapTable table, List<String> qualifier, List<Long> specifiedPartitions,
Expand All @@ -149,7 +159,7 @@ public LogicalOlapScan(RelationId id, OlapTable table, List<String> qualifier, L
// must use specifiedPartitions here for prune partition by sql like 'select * from t partition p1'
specifiedPartitions, false, tabletIds,
-1, false, PreAggStatus.unset(), specifiedPartitions, hints, Maps.newHashMap(),
tableSample, false, ImmutableMap.of());
tableSample, false, ImmutableMap.of(), ImmutableList.of());
}

public LogicalOlapScan(RelationId id, OlapTable table, List<String> qualifier, List<Long> tabletIds,
Expand All @@ -158,7 +168,8 @@ public LogicalOlapScan(RelationId id, OlapTable table, List<String> qualifier, L
this(id, table, qualifier, Optional.empty(), Optional.empty(),
selectedPartitionIds, false, tabletIds,
selectedIndexId, true, preAggStatus,
specifiedPartitions, hints, Maps.newHashMap(), tableSample, true, ImmutableMap.of());
specifiedPartitions, hints, Maps.newHashMap(), tableSample, true, ImmutableMap.of(),
ImmutableList.of());
}

/**
Expand All @@ -171,7 +182,7 @@ public LogicalOlapScan(RelationId id, Table table, List<String> qualifier,
PreAggStatus preAggStatus, List<Long> specifiedPartitions,
List<String> hints, Map<Pair<Long, String>, Slot> cacheSlotWithSlotName,
Optional<TableSample> tableSample, boolean directMvScan,
Map<String, Set<List<String>>> colToSubPathsMap) {
Map<String, Set<List<String>>> colToSubPathsMap, List<Long> specifiedTabletIds) {
super(id, PlanType.LOGICAL_OLAP_SCAN, table, qualifier,
groupExpression, logicalProperties);
Preconditions.checkArgument(selectedPartitionIds != null,
Expand All @@ -182,6 +193,7 @@ public LogicalOlapScan(RelationId id, Table table, List<String> qualifier,
this.indexSelected = indexSelected;
this.preAggStatus = preAggStatus;
this.manuallySpecifiedPartitions = ImmutableList.copyOf(specifiedPartitions);
this.manuallySpecifiedTabletIds = ImmutableList.copyOf(specifiedTabletIds);

if (selectedPartitionIds.isEmpty()) {
this.selectedPartitionIds = ImmutableList.of();
Expand Down Expand Up @@ -240,6 +252,7 @@ public boolean equals(Object o) {
&& partitionPruned == that.partitionPruned && Objects.equals(preAggStatus, that.preAggStatus)
&& Objects.equals(selectedTabletIds, that.selectedTabletIds)
&& Objects.equals(manuallySpecifiedPartitions, that.manuallySpecifiedPartitions)
&& Objects.equals(manuallySpecifiedTabletIds, that.manuallySpecifiedTabletIds)
&& Objects.equals(selectedPartitionIds, that.selectedPartitionIds)
&& Objects.equals(hints, that.hints)
&& Objects.equals(tableSample, that.tableSample);
Expand All @@ -248,8 +261,8 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), selectedIndexId, indexSelected, preAggStatus, cacheSlotWithSlotName,
selectedTabletIds, partitionPruned, manuallySpecifiedPartitions, selectedPartitionIds, hints,
tableSample);
selectedTabletIds, partitionPruned, manuallySpecifiedTabletIds, manuallySpecifiedPartitions,
selectedPartitionIds, hints, tableSample);
}

@Override
Expand All @@ -258,7 +271,7 @@ public LogicalOlapScan withGroupExpression(Optional<GroupExpression> groupExpres
groupExpression, Optional.of(getLogicalProperties()),
selectedPartitionIds, partitionPruned, selectedTabletIds,
selectedIndexId, indexSelected, preAggStatus, manuallySpecifiedPartitions,
hints, cacheSlotWithSlotName, tableSample, directMvScan, colToSubPathsMap);
hints, cacheSlotWithSlotName, tableSample, directMvScan, colToSubPathsMap, manuallySpecifiedTabletIds);
}

@Override
Expand All @@ -267,47 +280,55 @@ public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpr
return new LogicalOlapScan(relationId, (Table) table, qualifier, groupExpression, logicalProperties,
selectedPartitionIds, partitionPruned, selectedTabletIds,
selectedIndexId, indexSelected, preAggStatus, manuallySpecifiedPartitions,
hints, cacheSlotWithSlotName, tableSample, directMvScan, colToSubPathsMap);
hints, cacheSlotWithSlotName, tableSample, directMvScan, colToSubPathsMap, manuallySpecifiedTabletIds);
}

public LogicalOlapScan withSelectedPartitionIds(List<Long> selectedPartitionIds) {
return new LogicalOlapScan(relationId, (Table) table, qualifier,
Optional.empty(), Optional.of(getLogicalProperties()),
selectedPartitionIds, true, selectedTabletIds,
selectedIndexId, indexSelected, preAggStatus, manuallySpecifiedPartitions,
hints, cacheSlotWithSlotName, tableSample, directMvScan, colToSubPathsMap);
hints, cacheSlotWithSlotName, tableSample, directMvScan, colToSubPathsMap, manuallySpecifiedTabletIds);
}

public LogicalOlapScan withMaterializedIndexSelected(long indexId) {
return new LogicalOlapScan(relationId, (Table) table, qualifier,
Optional.empty(), Optional.of(getLogicalProperties()),
selectedPartitionIds, partitionPruned, selectedTabletIds,
indexId, true, PreAggStatus.unset(), manuallySpecifiedPartitions, hints, cacheSlotWithSlotName,
tableSample, directMvScan, colToSubPathsMap);
tableSample, directMvScan, colToSubPathsMap, manuallySpecifiedTabletIds);
}

public LogicalOlapScan withSelectedTabletIds(List<Long> selectedTabletIds) {
return new LogicalOlapScan(relationId, (Table) table, qualifier,
Optional.empty(), Optional.of(getLogicalProperties()),
selectedPartitionIds, partitionPruned, selectedTabletIds,
selectedIndexId, indexSelected, preAggStatus, manuallySpecifiedPartitions,
hints, cacheSlotWithSlotName, tableSample, directMvScan, colToSubPathsMap);
hints, cacheSlotWithSlotName, tableSample, directMvScan, colToSubPathsMap, manuallySpecifiedTabletIds);
}

public LogicalOlapScan withPreAggStatus(PreAggStatus preAggStatus) {
return new LogicalOlapScan(relationId, (Table) table, qualifier,
Optional.empty(), Optional.of(getLogicalProperties()),
selectedPartitionIds, partitionPruned, selectedTabletIds,
selectedIndexId, indexSelected, preAggStatus, manuallySpecifiedPartitions,
hints, cacheSlotWithSlotName, tableSample, directMvScan, colToSubPathsMap);
hints, cacheSlotWithSlotName, tableSample, directMvScan, colToSubPathsMap, manuallySpecifiedTabletIds);
}

public LogicalOlapScan withColToSubPathsMap(Map<String, Set<List<String>>> colToSubPathsMap) {
return new LogicalOlapScan(relationId, (Table) table, qualifier,
Optional.empty(), Optional.empty(),
selectedPartitionIds, partitionPruned, selectedTabletIds,
selectedIndexId, indexSelected, preAggStatus, manuallySpecifiedPartitions,
hints, cacheSlotWithSlotName, tableSample, directMvScan, colToSubPathsMap);
hints, cacheSlotWithSlotName, tableSample, directMvScan, colToSubPathsMap, manuallySpecifiedTabletIds);
}

public LogicalOlapScan withManuallySpecifiedTabletIds(List<Long> manuallySpecifiedTabletIds) {
return new LogicalOlapScan(relationId, (Table) table, qualifier,
Optional.empty(), Optional.of(getLogicalProperties()),
selectedPartitionIds, partitionPruned, selectedTabletIds,
selectedIndexId, indexSelected, preAggStatus, manuallySpecifiedPartitions,
hints, cacheSlotWithSlotName, tableSample, directMvScan, colToSubPathsMap, manuallySpecifiedTabletIds);
}

@Override
Expand All @@ -317,7 +338,7 @@ public LogicalOlapScan withRelationId(RelationId relationId) {
Optional.empty(), Optional.empty(),
selectedPartitionIds, false, selectedTabletIds,
selectedIndexId, indexSelected, preAggStatus, manuallySpecifiedPartitions,
hints, Maps.newHashMap(), tableSample, directMvScan, colToSubPathsMap);
hints, Maps.newHashMap(), tableSample, directMvScan, colToSubPathsMap, selectedTabletIds);
}

@Override
Expand All @@ -333,6 +354,10 @@ public List<Long> getSelectedTabletIds() {
return selectedTabletIds;
}

public List<Long> getManuallySpecifiedTabletIds() {
return manuallySpecifiedTabletIds;
}

@Override
public long getSelectedIndexId() {
return selectedIndexId;
Expand Down
Loading

0 comments on commit 4952a7c

Please sign in to comment.