Skip to content

Commit

Permalink
[feat](mtmv)Unified external table interface supporting partition ref…
Browse files Browse the repository at this point in the history
…resh and partition pruning (#44673)

### What problem does this PR solve?

- Add `MvccTable` to represent a table that supports querying specified
version data
- Add the `MvccSnapshot` interface to store snapshot information of mvcc
at a certain moment in time
- Add the `MvccSnapshot` parameter to the method of the
`MTMVRelatedTableIf `interface to retrieve data of a specified version
- Partition pruning related methods combined with the `MvccSnapshot`
parameter are used to obtain partition information for a specified
version
- Load the snapshot information of mvccTable at the beginning of the
query plan and store it in StatementContext

### Release note

Unified external table interface supporting partition refresh and
partition pruning
  • Loading branch information
zddr authored Nov 28, 2024
1 parent f1d37cb commit 913cda6
Show file tree
Hide file tree
Showing 23 changed files with 278 additions and 77 deletions.
7 changes: 3 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -364,7 +363,7 @@ public MTMVRefreshSnapshot getRefreshSnapshot() {
* @return mvPartitionName ==> mvPartitionKeyDesc
*/
public Map<String, PartitionKeyDesc> generateMvPartitionDescs() throws AnalysisException {
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems(OptionalLong.empty());
Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems();
Map<String, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<String, PartitionItem> entry : mtmvItems.entrySet()) {
result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc());
Expand Down Expand Up @@ -393,7 +392,7 @@ public Pair<Map<String, Set<String>>, Map<String, String>> calculateDoublyPartit
Map<String, String> baseToMv = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems(OptionalLong.empty());
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
Set<String> basePartitionNames = relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(),
Sets.newHashSet());
Expand Down Expand Up @@ -426,7 +425,7 @@ public Map<String, Set<String>> calculatePartitionMappings() throws AnalysisExce
Map<String, Set<String>> res = Maps.newHashMap();
Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil
.generateRelatedPartitionDescs(mvPartitionInfo, mvProperties);
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems(OptionalLong.empty());
Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) {
res.put(entry.getKey(),
relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet()));
Expand Down
25 changes: 21 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
Expand Down Expand Up @@ -110,7 +111,6 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -1049,6 +1049,10 @@ public PartitionInfo getPartitionInfo() {
}

@Override
public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) throws DdlException {
return getPartitionColumnNames();
}

public Set<String> getPartitionColumnNames() throws DdlException {
Set<String> partitionColumnNames = Sets.newHashSet();
if (partitionInfo instanceof SinglePartitionInfo) {
Expand Down Expand Up @@ -3251,12 +3255,21 @@ public long getVisibleVersionTime() {
}

@Override
public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
return getPartitionType();
}

public PartitionType getPartitionType() {
return partitionInfo.getType();
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems(OptionalLong snapshotId) throws AnalysisException {
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
throws AnalysisException {
return getAndCopyPartitionItems();
}

public Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException {
if (!tryReadLock(1, TimeUnit.MINUTES)) {
throw new AnalysisException("get table read lock timeout, database=" + getDBName() + ",table=" + getName());
}
Expand All @@ -3275,13 +3288,17 @@ public Map<String, PartitionItem> getAndCopyPartitionItems(OptionalLong snapshot
}

@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
return getPartitionColumns();
}

public List<Column> getPartitionColumns() {
return getPartitionInfo().getPartitionColumns();
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
OptionalLong snapshotId)
Optional<MvccSnapshot> snapshot)
throws AnalysisException {
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions();
long partitionId = getPartitionOrAnalysisException(partitionName).getId();
Expand All @@ -3291,7 +3308,7 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId) {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) {
Map<Long, Long> tableVersions = context.getBaseVersions().getTableVersions();
long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
Expand All @@ -55,7 +56,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;

/**
Expand Down Expand Up @@ -373,39 +373,39 @@ public TableIndexes getTableIndexes() {
/**
* Retrieve all partitions and initialize SelectedPartitions
*
* @param snapshotId if not support mvcc, ignore this
* @param snapshot if not support mvcc, ignore this
* @return
*/
public SelectedPartitions initSelectedPartitions(OptionalLong snapshotId) {
public SelectedPartitions initSelectedPartitions(Optional<MvccSnapshot> snapshot) {
if (!supportPartitionPruned()) {
return SelectedPartitions.NOT_PRUNED;
}
if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshotId))) {
if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshot))) {
return SelectedPartitions.NOT_PRUNED;
}
Map<String, PartitionItem> nameToPartitionItems = getNameToPartitionItems(snapshotId);
Map<String, PartitionItem> nameToPartitionItems = getNameToPartitionItems(snapshot);
return new SelectedPartitions(nameToPartitionItems.size(), nameToPartitionItems, false);
}

/**
* get partition map
* If partition related operations are supported, this method needs to be implemented in the subclass
*
* @param snapshotId if not support mvcc, ignore this
* @param snapshot if not support mvcc, ignore this
* @return partitionName ==> PartitionItem
*/
public Map<String, PartitionItem> getNameToPartitionItems(OptionalLong snapshotId) {
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return Collections.emptyMap();
}

/**
* get partition column list
* If partition related operations are supported, this method needs to be implemented in the subclass
*
* @param snapshotId if not support mvcc, ignore this
* @param snapshot if not support mvcc, ignore this
* @return
*/
public List<Column> getPartitionColumns(OptionalLong snapshotId) {
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.mtmv.MTMVBaseTableIf;
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
import org.apache.doris.mtmv.MTMVRefreshContext;
Expand Down Expand Up @@ -83,7 +84,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -288,7 +288,6 @@ public List<Type> getPartitionColumnTypes() {
.orElse(Collections.emptyList());
}

@Override
public List<Column> getPartitionColumns() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
Expand All @@ -297,7 +296,7 @@ public List<Column> getPartitionColumns() {
}

@Override
public List<Column> getPartitionColumns(OptionalLong snapshotId) {
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
return getPartitionColumns();
}

Expand All @@ -307,7 +306,7 @@ public boolean supportPartitionPruned() {
}

@Override
public Map<String, PartitionItem> getNameToPartitionItems(OptionalLong snapshotId) {
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return getNameToPartitionItems();
}

Expand Down Expand Up @@ -756,34 +755,32 @@ public Set<String> getDistributionColumnNames() {
}

@Override
public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
return getPartitionType();
}

public PartitionType getPartitionType() {
return getPartitionColumns().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED;
}

@Override
public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) {
return getPartitionColumnNames();
}

public Set<String> getPartitionColumnNames() {
return getPartitionColumns().stream()
.map(c -> c.getName().toLowerCase()).collect(Collectors.toSet());
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems(OptionalLong snapshotId) {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
getDbName(), getName(), getPartitionColumnTypes());
Map<String, PartitionItem> res = Maps.newHashMap();
Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem();
BiMap<Long, String> idToName = hivePartitionValues.getPartitionNameToIdMap().inverse();
for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
res.put(idToName.get(entry.getKey()), entry.getValue());
}
return res;
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
return getNameToPartitionItems();
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
OptionalLong snapshotId) throws AnalysisException {
Optional<MvccSnapshot> snapshot) throws AnalysisException {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
Expand All @@ -795,7 +792,7 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, OptionalLong snapshotId)
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot)
throws AnalysisException {
if (getPartitionType() == PartitionType.UNPARTITIONED) {
return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.mvcc;

/**
* The snapshot information of mvcc is defined by each table,
* but it should be ensured that the table information queried through this snapshot remains unchanged
*/
public interface MvccSnapshot {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.mvcc;

import org.apache.doris.catalog.TableIf;

/**
* The table that needs to query data based on the version needs to implement this interface.
*/
public interface MvccTable extends TableIf {
/**
* Retrieve the current snapshot information of the table,
* and the returned result will be used for the entire process of this query
*
* @return MvccSnapshot
*/
MvccSnapshot loadSnapshot();
}
Loading

0 comments on commit 913cda6

Please sign in to comment.