Skip to content

Commit

Permalink
[feat](mtmv)Unified external table interface supporting partition ref…
Browse files Browse the repository at this point in the history
…resh and partition pruning (apache#44673)

- Add `MvccTable` to represent a table that supports querying specified
version data
- Add the `MvccSnapshot` interface to store snapshot information of mvcc
at a certain moment in time
- Add the `MvccSnapshot` parameter to the method of the
`MTMVRelatedTableIf `interface to retrieve data of a specified version
- Partition pruning related methods combined with the `MvccSnapshot`
parameter are used to obtain partition information for a specified
version
- Load the snapshot information of mvccTable at the beginning of the
query plan and store it in StatementContext

Unified external table interface supporting partition refresh and
partition pruning
  • Loading branch information
zddr committed Dec 2, 2024
1 parent efbd389 commit a616855
Show file tree
Hide file tree
Showing 22 changed files with 271 additions and 56 deletions.
23 changes: 21 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
Expand Down Expand Up @@ -953,6 +954,10 @@ public PartitionInfo getPartitionInfo() {
}

@Override
public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) throws DdlException {
return getPartitionColumnNames();
}

public Set<String> getPartitionColumnNames() throws DdlException {
Set<String> partitionColumnNames = Sets.newHashSet();
if (partitionInfo instanceof SinglePartitionInfo) {
Expand Down Expand Up @@ -3117,11 +3122,20 @@ public long getVisibleVersionTime() {
}

@Override
public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
return getPartitionType();
}

public PartitionType getPartitionType() {
return partitionInfo.getType();
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
throws AnalysisException {
return getAndCopyPartitionItems();
}

public Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException {
if (!tryReadLock(1, TimeUnit.MINUTES)) {
throw new AnalysisException("get table read lock timeout, database=" + getDBName() + ",table=" + getName());
Expand All @@ -3141,12 +3155,17 @@ public Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisExce
}

@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
return getPartitionColumns();
}

public List<Column> getPartitionColumns() {
return getPartitionInfo().getPartitionColumns();
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
throws AnalysisException {
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions();
long partitionId = getPartitionOrAnalysisException(partitionName).getId();
Expand All @@ -3156,7 +3175,7 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) {
Map<Long, Long> tableVersions = context.getBaseVersions().getTableVersions();
long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
Expand All @@ -55,7 +56,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;

/**
Expand Down Expand Up @@ -373,39 +373,39 @@ public TableIndexes getTableIndexes() {
/**
* Retrieve all partitions and initialize SelectedPartitions
*
* @param snapshotId if not support mvcc, ignore this
* @param snapshot if not support mvcc, ignore this
* @return
*/
public SelectedPartitions initSelectedPartitions(OptionalLong snapshotId) {
public SelectedPartitions initSelectedPartitions(Optional<MvccSnapshot> snapshot) {
if (!supportPartitionPruned()) {
return SelectedPartitions.NOT_PRUNED;
}
if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshotId))) {
if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshot))) {
return SelectedPartitions.NOT_PRUNED;
}
Map<String, PartitionItem> nameToPartitionItems = getNameToPartitionItems(snapshotId);
Map<String, PartitionItem> nameToPartitionItems = getNameToPartitionItems(snapshot);
return new SelectedPartitions(nameToPartitionItems.size(), nameToPartitionItems, false);
}

/**
* get partition map
* If partition related operations are supported, this method needs to be implemented in the subclass
*
* @param snapshotId if not support mvcc, ignore this
* @param snapshot if not support mvcc, ignore this
* @return partitionName ==> PartitionItem
*/
public Map<String, PartitionItem> getNameToPartitionItems(OptionalLong snapshotId) {
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return Collections.emptyMap();
}

/**
* get partition column list
* If partition related operations are supported, this method needs to be implemented in the subclass
*
* @param snapshotId if not support mvcc, ignore this
* @param snapshot if not support mvcc, ignore this
* @return
*/
public List<Column> getPartitionColumns(OptionalLong snapshotId) {
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.mtmv.MTMVBaseTableIf;
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
import org.apache.doris.mtmv.MTMVRefreshContext;
Expand Down Expand Up @@ -287,7 +288,6 @@ public List<Type> getPartitionColumnTypes() {
.orElse(Collections.emptyList());
}

@Override
public List<Column> getPartitionColumns() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
Expand All @@ -296,7 +296,7 @@ public List<Column> getPartitionColumns() {
}

@Override
public List<Column> getPartitionColumns(OptionalLong snapshotId) {
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
return getPartitionColumns();
}

Expand All @@ -306,7 +306,7 @@ public boolean supportPartitionPruned() {
}

@Override
public Map<String, PartitionItem> getNameToPartitionItems(OptionalLong snapshotId) {
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return getNameToPartitionItems();
}

Expand Down Expand Up @@ -755,34 +755,32 @@ public Set<String> getDistributionColumnNames() {
}

@Override
public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
return getPartitionType();
}

public PartitionType getPartitionType() {
return getPartitionColumns().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED;
}

@Override
public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) {
return getPartitionColumnNames();
}

public Set<String> getPartitionColumnNames() {
return getPartitionColumns().stream()
.map(c -> c.getName().toLowerCase()).collect(Collectors.toSet());
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems() {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
getDbName(), getName(), getPartitionColumnTypes());
Map<String, PartitionItem> res = Maps.newHashMap();
Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem();
BiMap<Long, String> idToName = hivePartitionValues.getPartitionNameToIdMap().inverse();
for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
res.put(idToName.get(entry.getKey()), entry.getValue());
}
return res;
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
return getNameToPartitionItems();
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot) throws AnalysisException {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
Expand All @@ -794,7 +792,8 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot)
throws AnalysisException {
if (getPartitionType() == PartitionType.UNPARTITIONED) {
return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.mvcc;

/**
* The snapshot information of mvcc is defined by each table,
* but it should be ensured that the table information queried through this snapshot remains unchanged
*/
public interface MvccSnapshot {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.mvcc;

import org.apache.doris.catalog.TableIf;

/**
* The table that needs to query data based on the version needs to implement this interface.
*/
public interface MvccTable extends TableIf {
/**
* Retrieve the current snapshot information of the table,
* and the returned result will be used for the entire process of this query
*
* @return MvccSnapshot
*/
MvccSnapshot loadSnapshot();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.mvcc;

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.CatalogIf;

import com.google.common.base.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class MvccTableInfo {
private static final Logger LOG = LogManager.getLogger(MvccTableInfo.class);

private String tableName;
private String dbName;
private String ctlName;

public MvccTableInfo(TableIf table) {
java.util.Objects.requireNonNull(table, "table is null");
DatabaseIf database = table.getDatabase();
java.util.Objects.requireNonNull(database, "database is null");
CatalogIf catalog = database.getCatalog();
java.util.Objects.requireNonNull(database, "catalog is null");
this.tableName = table.getName();
this.dbName = database.getFullName();
this.ctlName = catalog.getName();
}

public String getTableName() {
return tableName;
}

public String getDbName() {
return dbName;
}

public String getCtlName() {
return ctlName;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MvccTableInfo that = (MvccTableInfo) o;
return Objects.equal(tableName, that.tableName) && Objects.equal(
dbName, that.dbName) && Objects.equal(ctlName, that.ctlName);
}

@Override
public int hashCode() {
return Objects.hashCode(tableName, dbName, ctlName);
}

@Override
public String toString() {
return "MvccTableInfo{"
+ "tableName='" + tableName + '\''
+ ", dbName='" + dbName + '\''
+ ", ctlName='" + ctlName + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class PaimonExternalTable extends ExternalTable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void analyze(MTMVPartitionInfo mvPartitionInfo) throws AnalysisException
String.format("timeUnit not support: %s, only support: %s", this.timeUnit, timeUnits));
}
MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable();
PartitionType partitionType = relatedTable.getPartitionType();
PartitionType partitionType = relatedTable.getPartitionType(Optional.empty());
if (partitionType == PartitionType.RANGE) {
Type partitionColumnType = MTMVPartitionUtil
.getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol());
Expand Down
Loading

0 comments on commit a616855

Please sign in to comment.