diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index 825bdef9f09819..955bfd4279fd5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -59,7 +59,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -364,7 +363,7 @@ public MTMVRefreshSnapshot getRefreshSnapshot() { * @return mvPartitionName ==> mvPartitionKeyDesc */ public Map generateMvPartitionDescs() throws AnalysisException { - Map mtmvItems = getAndCopyPartitionItems(OptionalLong.empty()); + Map mtmvItems = getAndCopyPartitionItems(); Map result = Maps.newHashMap(); for (Entry entry : mtmvItems.entrySet()) { result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc()); @@ -393,7 +392,7 @@ public Pair>, Map> calculateDoublyPartit Map baseToMv = Maps.newHashMap(); Map> relatedPartitionDescs = MTMVPartitionUtil .generateRelatedPartitionDescs(mvPartitionInfo, mvProperties); - Map mvPartitionItems = getAndCopyPartitionItems(OptionalLong.empty()); + Map mvPartitionItems = getAndCopyPartitionItems(); for (Entry entry : mvPartitionItems.entrySet()) { Set basePartitionNames = relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet()); @@ -426,7 +425,7 @@ public Map> calculatePartitionMappings() throws AnalysisExce Map> res = Maps.newHashMap(); Map> relatedPartitionDescs = MTMVPartitionUtil .generateRelatedPartitionDescs(mvPartitionInfo, mvProperties); - Map mvPartitionItems = getAndCopyPartitionItems(OptionalLong.empty()); + Map mvPartitionItems = getAndCopyPartitionItems(); for (Entry entry : mvPartitionItems.entrySet()) { res.put(entry.getKey(), relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index ec3bd2acbc57d6..5d57540017f067 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -50,6 +50,7 @@ import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; @@ -110,7 +111,6 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -import java.util.OptionalLong; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -1049,6 +1049,10 @@ public PartitionInfo getPartitionInfo() { } @Override + public Set getPartitionColumnNames(Optional snapshot) throws DdlException { + return getPartitionColumnNames(); + } + public Set getPartitionColumnNames() throws DdlException { Set partitionColumnNames = Sets.newHashSet(); if (partitionInfo instanceof SinglePartitionInfo) { @@ -3251,12 +3255,21 @@ public long getVisibleVersionTime() { } @Override + public PartitionType getPartitionType(Optional snapshot) { + return getPartitionType(); + } + public PartitionType getPartitionType() { return partitionInfo.getType(); } @Override - public Map getAndCopyPartitionItems(OptionalLong snapshotId) throws AnalysisException { + public Map getAndCopyPartitionItems(Optional snapshot) + throws AnalysisException { + return getAndCopyPartitionItems(); + } + + public Map getAndCopyPartitionItems() throws AnalysisException { if (!tryReadLock(1, TimeUnit.MINUTES)) { throw new AnalysisException("get table read lock timeout, database=" + getDBName() + ",table=" + getName()); } @@ -3275,13 +3288,17 @@ public Map getAndCopyPartitionItems(OptionalLong snapshot } @Override + public List getPartitionColumns(Optional snapshot) { + return getPartitionColumns(); + } + public List getPartitionColumns() { return getPartitionInfo().getPartitionColumns(); } @Override public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, - OptionalLong snapshotId) + Optional snapshot) throws AnalysisException { Map partitionVersions = context.getBaseVersions().getPartitionVersions(); long partitionId = getPartitionOrAnalysisException(partitionName).getId(); @@ -3291,7 +3308,7 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont } @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId) { + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) { Map tableVersions = context.getBaseVersions().getTableVersions(); long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion(); return new MTMVVersionSnapshot(visibleVersion, id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 3aee5550acf646..d82959954f2607 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -31,6 +31,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; @@ -55,7 +56,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalLong; import java.util.Set; /** @@ -373,17 +373,17 @@ public TableIndexes getTableIndexes() { /** * Retrieve all partitions and initialize SelectedPartitions * - * @param snapshotId if not support mvcc, ignore this + * @param snapshot if not support mvcc, ignore this * @return */ - public SelectedPartitions initSelectedPartitions(OptionalLong snapshotId) { + public SelectedPartitions initSelectedPartitions(Optional snapshot) { if (!supportPartitionPruned()) { return SelectedPartitions.NOT_PRUNED; } - if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshotId))) { + if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshot))) { return SelectedPartitions.NOT_PRUNED; } - Map nameToPartitionItems = getNameToPartitionItems(snapshotId); + Map nameToPartitionItems = getNameToPartitionItems(snapshot); return new SelectedPartitions(nameToPartitionItems.size(), nameToPartitionItems, false); } @@ -391,10 +391,10 @@ public SelectedPartitions initSelectedPartitions(OptionalLong snapshotId) { * get partition map * If partition related operations are supported, this method needs to be implemented in the subclass * - * @param snapshotId if not support mvcc, ignore this + * @param snapshot if not support mvcc, ignore this * @return partitionName ==> PartitionItem */ - public Map getNameToPartitionItems(OptionalLong snapshotId) { + public Map getNameToPartitionItems(Optional snapshot) { return Collections.emptyMap(); } @@ -402,10 +402,10 @@ public Map getNameToPartitionItems(OptionalLong snapshotI * get partition column list * If partition related operations are supported, this method needs to be implemented in the subclass * - * @param snapshotId if not support mvcc, ignore this + * @param snapshot if not support mvcc, ignore this * @return */ - public List getPartitionColumns(OptionalLong snapshotId) { + public List getPartitionColumns(Optional snapshot) { return Collections.emptyList(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 1c30fa24cfb51e..6d65f8bcdbccb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -33,6 +33,7 @@ import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; import org.apache.doris.mtmv.MTMVRefreshContext; @@ -83,7 +84,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.OptionalLong; import java.util.Set; import java.util.stream.Collectors; @@ -288,7 +288,6 @@ public List getPartitionColumnTypes() { .orElse(Collections.emptyList()); } - @Override public List getPartitionColumns() { makeSureInitialized(); Optional schemaCacheValue = getSchemaCacheValue(); @@ -297,7 +296,7 @@ public List getPartitionColumns() { } @Override - public List getPartitionColumns(OptionalLong snapshotId) { + public List getPartitionColumns(Optional snapshot) { return getPartitionColumns(); } @@ -307,7 +306,7 @@ public boolean supportPartitionPruned() { } @Override - public Map getNameToPartitionItems(OptionalLong snapshotId) { + public Map getNameToPartitionItems(Optional snapshot) { return getNameToPartitionItems(); } @@ -756,34 +755,32 @@ public Set getDistributionColumnNames() { } @Override + public PartitionType getPartitionType(Optional snapshot) { + return getPartitionType(); + } + public PartitionType getPartitionType() { return getPartitionColumns().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; } @Override + public Set getPartitionColumnNames(Optional snapshot) { + return getPartitionColumnNames(); + } + public Set getPartitionColumnNames() { return getPartitionColumns().stream() .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); } @Override - public Map getAndCopyPartitionItems(OptionalLong snapshotId) { - HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() - .getMetaStoreCache((HMSExternalCatalog) getCatalog()); - HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( - getDbName(), getName(), getPartitionColumnTypes()); - Map res = Maps.newHashMap(); - Map idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); - BiMap idToName = hivePartitionValues.getPartitionNameToIdMap().inverse(); - for (Entry entry : idToPartitionItem.entrySet()) { - res.put(idToName.get(entry.getKey()), entry.getValue()); - } - return res; + public Map getAndCopyPartitionItems(Optional snapshot) { + return getNameToPartitionItems(); } @Override public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, - OptionalLong snapshotId) throws AnalysisException { + Optional snapshot) throws AnalysisException { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) getCatalog()); HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( @@ -795,7 +792,7 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont } @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId) + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) throws AnalysisException { if (getPartitionType() == PartitionType.UNPARTITIONED) { return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java new file mode 100644 index 00000000000000..d7826b0a5de19e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccSnapshot.java @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +/** + * The snapshot information of mvcc is defined by each table, + * but it should be ensured that the table information queried through this snapshot remains unchanged + */ +public interface MvccSnapshot { +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java new file mode 100644 index 00000000000000..d69e0f3114df0c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +import org.apache.doris.catalog.TableIf; + +/** + * The table that needs to query data based on the version needs to implement this interface. + */ +public interface MvccTable extends TableIf { + /** + * Retrieve the current snapshot information of the table, + * and the returned result will be used for the entire process of this query + * + * @return MvccSnapshot + */ + MvccSnapshot loadSnapshot(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java new file mode 100644 index 00000000000000..0d865f837c8c4e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTableInfo.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.datasource.CatalogIf; + +import com.google.common.base.Objects; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class MvccTableInfo { + private static final Logger LOG = LogManager.getLogger(MvccTableInfo.class); + + private String tableName; + private String dbName; + private String ctlName; + + public MvccTableInfo(TableIf table) { + java.util.Objects.requireNonNull(table, "table is null"); + DatabaseIf database = table.getDatabase(); + java.util.Objects.requireNonNull(database, "database is null"); + CatalogIf catalog = database.getCatalog(); + java.util.Objects.requireNonNull(database, "catalog is null"); + this.tableName = table.getName(); + this.dbName = database.getFullName(); + this.ctlName = catalog.getName(); + } + + public String getTableName() { + return tableName; + } + + public String getDbName() { + return dbName; + } + + public String getCtlName() { + return ctlName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MvccTableInfo that = (MvccTableInfo) o; + return Objects.equal(tableName, that.tableName) && Objects.equal( + dbName, that.dbName) && Objects.equal(ctlName, that.ctlName); + } + + @Override + public int hashCode() { + return Objects.hashCode(tableName, dbName, ctlName); + } + + @Override + public String toString() { + return "MvccTableInfo{" + + "tableName='" + tableName + '\'' + + ", dbName='" + dbName + '\'' + + ", ctlName='" + ctlName + '\'' + + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 632a0da0ebd316..7fe3c858448e3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -28,6 +28,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; @@ -67,7 +68,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalLong; import java.util.Set; import java.util.stream.Collectors; @@ -313,29 +313,29 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { } @Override - public Map getAndCopyPartitionItems(OptionalLong snapshotId) { + public Map getAndCopyPartitionItems(Optional snapshot) { return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); } @Override - public PartitionType getPartitionType() { + public PartitionType getPartitionType(Optional snapshot) { return getPartitionColumnsFromCache().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED; } @Override - public Set getPartitionColumnNames() { + public Set getPartitionColumnNames(Optional snapshot) { return getPartitionColumnsFromCache().stream() .map(c -> c.getName().toLowerCase()).collect(Collectors.toSet()); } @Override - public List getPartitionColumns() { + public List getPartitionColumns(Optional snapshot) { return getPartitionColumnsFromCache(); } @Override public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, - OptionalLong snapshotId) + Optional snapshot) throws AnalysisException { PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); if (paimonPartition == null) { @@ -345,7 +345,7 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont } @Override - public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId) + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) throws AnalysisException { return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java index ea15c84d1b925d..95a8717e01c4c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionExprDateTrunc.java @@ -69,7 +69,7 @@ public void analyze(MTMVPartitionInfo mvPartitionInfo) throws AnalysisException String.format("timeUnit not support: %s, only support: %s", this.timeUnit, timeUnits)); } MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable(); - PartitionType partitionType = relatedTable.getPartitionType(); + PartitionType partitionType = relatedTable.getPartitionType(Optional.empty()); if (partitionType == PartitionType.RANGE) { Type partitionColumnType = MTMVPartitionUtil .getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java index b3cd239269abc7..7eae44db0af4cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java @@ -25,6 +25,7 @@ import com.google.gson.annotations.SerializedName; import java.util.List; +import java.util.Optional; /** * MTMVPartitionInfo @@ -115,7 +116,7 @@ public int getRelatedColPos() throws AnalysisException { if (partitionType == MTMVPartitionType.SELF_MANAGE) { throw new AnalysisException("partitionType is: " + partitionType); } - List partitionColumns = getRelatedTable().getPartitionColumns(); + List partitionColumns = getRelatedTable().getPartitionColumns(Optional.empty()); for (int i = 0; i < partitionColumns.size(); i++) { if (partitionColumns.get(i).getName().equalsIgnoreCase(relatedCol)) { return i; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 1bbc51fb004c57..8ba022de415006 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -50,7 +50,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.OptionalLong; +import java.util.Optional; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -330,7 +330,7 @@ public static boolean isSyncWithPartitions(MTMVRefreshContext context, String mt } for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName, context, OptionalLong.empty()); + .getPartitionSnapshot(relatedPartitionName, context, Optional.empty()); if (!mtmv.getRefreshSnapshot() .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot)) { @@ -447,7 +447,7 @@ private static boolean isSyncWithBaseTable(MTMVRefreshContext context, String mt if (!baseTable.needAutoRefresh()) { return true; } - MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context, OptionalLong.empty()); + MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context, Optional.empty()); return mtmv.getRefreshSnapshot() .equalsWithBaseTable(mtmvPartitionName, new BaseTableInfo(baseTable), baseTableCurrentSnapshot); } @@ -483,7 +483,7 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf partitionSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionName, context, OptionalLong.empty()); + .getPartitionSnapshot(relatedPartitionName, context, Optional.empty()); refreshPartitionSnapshot.getPartitions() .put(relatedPartitionName, partitionSnapshot); } @@ -498,13 +498,13 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres continue; } refreshPartitionSnapshot.addTableSnapshot(baseTableInfo, - ((MTMVRelatedTableIf) table).getTableSnapshot(context, OptionalLong.empty())); + ((MTMVRelatedTableIf) table).getTableSnapshot(context, Optional.empty())); } return refreshPartitionSnapshot; } public static Type getPartitionColumnType(MTMVRelatedTableIf relatedTable, String col) throws AnalysisException { - List partitionColumns = relatedTable.getPartitionColumns(); + List partitionColumns = relatedTable.getPartitionColumns(Optional.empty()); for (Column column : partitionColumns) { if (column.getName().equals(col)) { return column.getType(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java index ef3100dec4c732..c6b4e331184e2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescInitGenerator.java @@ -20,7 +20,7 @@ import org.apache.doris.common.AnalysisException; import java.util.Map; -import java.util.OptionalLong; +import java.util.Optional; /** * get all related partition descs @@ -30,6 +30,6 @@ public class MTMVRelatedPartitionDescInitGenerator implements MTMVRelatedPartiti @Override public void apply(MTMVPartitionInfo mvPartitionInfo, Map mvProperties, RelatedPartitionDescResult lastResult) throws AnalysisException { - lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems(OptionalLong.empty())); + lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems(Optional.empty())); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java index 76e20ef70f5d92..325fab819d9a09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; /** @@ -45,7 +46,7 @@ public void apply(MTMVPartitionInfo mvPartitionInfo, Map mvPrope return; } MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable(); - PartitionType partitionType = relatedTable.getPartitionType(); + PartitionType partitionType = relatedTable.getPartitionType(Optional.empty()); if (partitionType == PartitionType.RANGE) { lastResult.setDescs(rollUpRange(lastResult.getDescs(), mvPartitionInfo)); } else if (partitionType == PartitionType.LIST) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index e18784ae253a0f..c4261aa78f10be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -23,10 +23,11 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.mvcc.MvccSnapshot; import java.util.List; import java.util.Map; -import java.util.OptionalLong; +import java.util.Optional; import java.util.Set; /** @@ -39,32 +40,35 @@ public interface MTMVRelatedTableIf extends TableIf { * Note: This method is called every time there is a refresh and transparent rewrite, * so if this method is slow, it will significantly reduce query performance * - * @param snapshotId + * @param snapshot * @return partitionName->PartitionItem */ - Map getAndCopyPartitionItems(OptionalLong snapshotId) throws AnalysisException; + Map getAndCopyPartitionItems(Optional snapshot) throws AnalysisException; /** * getPartitionType LIST/RANGE/UNPARTITIONED * + * @param snapshot * @return */ - PartitionType getPartitionType(); + PartitionType getPartitionType(Optional snapshot); /** * getPartitionColumnNames * + * @param snapshot * @return * @throws DdlException */ - Set getPartitionColumnNames() throws DdlException; + Set getPartitionColumnNames(Optional snapshot) throws DdlException; /** * getPartitionColumns * + * @param snapshot * @return */ - List getPartitionColumns(); + List getPartitionColumns(Optional snapshot); /** * getPartitionSnapshot @@ -72,14 +76,14 @@ public interface MTMVRelatedTableIf extends TableIf { * If snapshots have already been obtained in bulk in the context, * the results should be obtained directly from the context * - * @param snapshotId + * @param snapshot * @param partitionName * @param context * @return partition snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, OptionalLong snapshotId) - throws AnalysisException; + MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional snapshot) throws AnalysisException; /** * getTableSnapshot @@ -87,12 +91,13 @@ MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext con * If snapshots have already been obtained in bulk in the context, * the results should be obtained directly from the context * - * @param snapshotId + * @param snapshot * @param context * @return table snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId) throws AnalysisException; + MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) + throws AnalysisException; /** * Does the current type of table allow timed triggering diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java index a5c966370f030d..17ae5883063fb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java @@ -70,6 +70,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import org.apache.commons.collections.MapUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -462,6 +463,13 @@ private Set> getTables(LogicalPlan logicalPlan) { return tableNames; } + public Map, TableIf> getOrExtractTables(LogicalPlan logicalPlan) { + if (MapUtils.isEmpty(tables)) { + extractTables(logicalPlan); + } + return tables; + } + private Set> extractTableNamesFromHaving(LogicalHaving having) { Set subqueryExprs = having.getPredicate() .collect(SubqueryExpr.class::isInstance); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 8fb0f6e77976da..c7478411a5de11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -216,7 +216,7 @@ public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties, plan = preprocess(plan); initCascadesContext(plan, requireProperties); - + statementContext.loadSnapshots(cascadesContext.getOrExtractTables(plan)); try (Lock lock = new Lock(plan, cascadesContext)) { Plan resultPlan = planWithoutLock(plan, explainLevel, showPlanProcess, requireProperties); lockCallback.accept(resultPlan); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 6b6e335b74a676..b172f9dc591bd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -24,6 +24,9 @@ import org.apache.doris.common.Id; import org.apache.doris.common.IdGenerator; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccTableInfo; import org.apache.doris.nereids.hint.Hint; import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.rules.analysis.ColumnAliasGenerator; @@ -174,6 +177,8 @@ public class StatementContext implements Closeable { private Backend groupCommitMergeBackend; + private final Map snapshots = Maps.newHashMap(); + private boolean privChecked; public StatementContext() { @@ -510,6 +515,32 @@ public void addPlannerHook(PlannerHook plannerHook) { this.plannerHooks.add(plannerHook); } + /** + * Load snapshot information of mvcc + * + * @param tables Tables used in queries + */ + public void loadSnapshots(Map, TableIf> tables) { + if (tables == null) { + return; + } + for (TableIf tableIf : tables.values()) { + if (tableIf instanceof MvccTable) { + snapshots.put(new MvccTableInfo(tableIf), ((MvccTable) tableIf).loadSnapshot()); + } + } + } + + /** + * Obtain snapshot information of mvcc + * + * @param mvccTable mvccTable + * @return MvccSnapshot + */ + public MvccSnapshot getSnapshot(MvccTable mvccTable) { + return snapshots.get(new MvccTableInfo(mvccTable)); + } + private static class CloseableResource implements Closeable { public final String resourceName; public final String threadName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java index a659c2f9990a3f..484abd11f01e72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java @@ -481,13 +481,13 @@ public Void visitLogicalRelation(LogicalRelation relation, IncrementCheckerConte return null; } MTMVRelatedTableIf relatedTable = (MTMVRelatedTableIf) table; - PartitionType type = relatedTable.getPartitionType(); + PartitionType type = relatedTable.getPartitionType(Optional.empty()); if (PartitionType.UNPARTITIONED.equals(type)) { context.addFailReason(String.format("related base table is not partition table, the table is %s", table.getName())); return null; } - Set partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns()); + Set partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns(Optional.empty())); Column mvReferenceColumn = contextPartitionColumn.getColumn().get(); Expr definExpr = mvReferenceColumn.getDefineExpr(); if (definExpr instanceof SlotRef) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index d50219383072df..4bbb0a8aa76270 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -36,7 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.OptionalLong; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -75,7 +75,7 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, LogicalFilter filter, LogicalFileScan scan, CascadesContext ctx) { Map selectedPartitionItems = Maps.newHashMap(); // todo: real snapshotId - if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(OptionalLong.empty()))) { + if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(Optional.empty()))) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. return SelectedPartitions.NOT_PRUNED; @@ -84,7 +84,7 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, .stream() .collect(Collectors.toMap(slot -> slot.getName().toLowerCase(), Function.identity())); // todo: real snapshotId - List partitionSlots = externalTable.getPartitionColumns(OptionalLong.empty()) + List partitionSlots = externalTable.getPartitionColumns(Optional.empty()) .stream() .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java index de5e188d5a65bf..8c44b42a5ccfbd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java @@ -351,7 +351,7 @@ private PartitionDesc generatePartitionDesc(ConnectContext ctx) { allPartitionDescs.size(), ctx.getSessionVariable().getCreateTablePartitionMaxNum())); } try { - PartitionType type = relatedTable.getPartitionType(); + PartitionType type = relatedTable.getPartitionType(Optional.empty()); if (type == PartitionType.RANGE) { return new RangePartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()), allPartitionDescs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java index 427e2368e7ab2b..c4117e8608e29d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/MTMVPartitionDefinition.java @@ -54,6 +54,7 @@ import com.google.common.collect.Sets; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -147,7 +148,7 @@ private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectCont MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo()); Set partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); try { - partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames()); + partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames(Optional.empty())); } catch (DdlException e) { throw new AnalysisException(e.getMessage(), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index 010c30d915d529..96b8e032d11274 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -36,7 +36,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.OptionalLong; /** * Logical file scan for external catalog. @@ -64,7 +63,7 @@ public LogicalFileScan(RelationId id, ExternalTable table, List qualifie Optional tableSample, Optional tableSnapshot) { // todo: real snapshotId this(id, table, qualifier, Optional.empty(), Optional.empty(), - table.initSelectedPartitions(OptionalLong.empty()), tableSample, tableSnapshot); + table.initSelectedPartitions(Optional.empty()), tableSample, tableSnapshot); } public SelectedPartitions getSelectedPartitions() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java index 96ac59b81216bc..e5d2e21a8db626 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -35,7 +35,7 @@ import org.junit.Test; import java.util.List; -import java.util.OptionalLong; +import java.util.Optional; import java.util.Set; public class MTMVPartitionUtilTest { @@ -113,7 +113,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = true; - baseOlapTable.getTableSnapshot((MTMVRefreshContext) any, (OptionalLong) any); + baseOlapTable.getTableSnapshot((MTMVRefreshContext) any, (Optional) any); minTimes = 0; result = baseSnapshotIf; @@ -133,7 +133,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = true; - baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any, (OptionalLong) any); + baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any, (Optional) any); minTimes = 0; result = baseSnapshotIf;