diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java index 690ab88991bd16e..b5e90634b8bbb1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java @@ -60,14 +60,9 @@ public boolean isDefaultPartition() { @Override public PartitionKeyDesc toPartitionKeyDesc() { - if (partitionKeyRange.hasLowerBound()) { - return PartitionKeyDesc.createFixed( + return PartitionKeyDesc.createFixed( PartitionInfo.toPartitionValue(partitionKeyRange.lowerEndpoint()), PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint())); - } else { - // For null partition value. - return PartitionKeyDesc.createLessThan(PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint())); - } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index da4670d6d0589d9..9d3b84d2f76a885 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -543,7 +543,7 @@ public Optional initSchema() { } private List getIcebergSchema() { - return IcebergUtils.getSchema(catalog, dbName, name); + return IcebergUtils.getSchema(catalog, dbName, name, IcebergUtils.LATEST_SNAPSHOT_ID); } private List getHudiSchema() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index e259399f63740bf..9f43413cc4c2896 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -27,9 +27,14 @@ import org.apache.doris.catalog.RangePartitionItem; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.ExternalSchemaCache; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; @@ -77,7 +82,7 @@ import java.util.Set; import java.util.stream.Collectors; -public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf { +public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable { public static final String YEAR = "year"; public static final String MONTH = "month"; @@ -102,6 +107,8 @@ public String getIcebergCatalogType() { protected synchronized void makeSureInitialized() { super.makeSureInitialized(); if (!objectCreated) { + IcebergSchemaCacheKey key = new IcebergSchemaCacheKey(dbName, name, IcebergUtils.LATEST_SNAPSHOT_ID); + initSchema(key); objectCreated = true; } } @@ -117,39 +124,23 @@ public void setPartitionColumns(List partitionColumns) { } @Override - public Optional initSchema() { + public Optional initSchema(SchemaCacheKey key) { table = IcebergUtils.getIcebergTable(catalog, dbName, name); - List schema = IcebergUtils.getSchema(catalog, dbName, name); - Snapshot snapshot = table.currentSnapshot(); - if (snapshot == null) { - LOG.debug("Table {} is empty", name); - return Optional.of(new IcebergSchemaCacheValue(schema, null, -1, null)); - } - long snapshotId = snapshot.snapshotId(); - partitionColumns = null; - IcebergPartitionInfo partitionInfo = null; - if (isValidRelatedTable()) { - PartitionSpec spec = table.spec(); - partitionColumns = Lists.newArrayList(); - - // For iceberg table, we only support table with 1 partition column as RelatedTable. - // So we use spec.fields().get(0) to get the partition column. - Types.NestedField col = table.schema().findField(spec.fields().get(0).sourceId()); + List schema = IcebergUtils.getSchema(catalog, dbName, name, + ((IcebergSchemaCacheKey) key).getSnapshotId()); + List tmpColumns = Lists.newArrayList(); + PartitionSpec spec = table.spec(); + for (PartitionField field : spec.fields()) { + Types.NestedField col = table.schema().findField(field.sourceId()); for (Column c : schema) { if (c.getName().equalsIgnoreCase(col.name())) { - partitionColumns.add(c); + tmpColumns.add(c); break; } } - Preconditions.checkState(partitionColumns.size() == 1, - "Support 1 partition column for iceberg table, but found " + partitionColumns.size()); - try { - partitionInfo = loadPartitionInfo(); - } catch (AnalysisException e) { - LOG.warn("Failed to load iceberg table {} partition info.", name, e); - } } - return Optional.of(new IcebergSchemaCacheValue(schema, partitionColumns, snapshotId, partitionInfo)); + partitionColumns = tmpColumns; + return Optional.of(new IcebergSchemaCacheValue(schema, partitionColumns)); } @Override @@ -187,6 +178,12 @@ public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } + private IcebergSnapshotCacheValue getIcebergSnapshotCacheValue() { + makeSureInitialized(); + return Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache() + .getSnapshotCache(catalog, dbName, name); + } + @Override public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { Env.getCurrentEnv().getRefreshManager() @@ -195,16 +192,14 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { @Override public Map getAndCopyPartitionItems(Optional snapshot) { - return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + makeSureInitialized(); + return Maps.newHashMap(getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem()); } - private IcebergPartitionInfo getPartitionInfoFromCache() { + @Override + public Map getNameToPartitionItems(Optional snapshot) { makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - if (!schemaCacheValue.isPresent()) { - return new IcebergPartitionInfo(); - } - return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + return getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem(); } @Override @@ -215,26 +210,26 @@ public PartitionType getPartitionType(Optional snapshot) { @Override public Set getPartitionColumnNames(Optional snapshot) throws DdlException { - return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet()); + makeSureInitialized(); + IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); + IcebergSchemaCacheValue schemaValue = getIcebergSchemaCacheValue(snapshotValue.getSnapshotId()); + return schemaValue.getPartitionColumns().stream().map(Column::getName).collect(Collectors.toSet()); } @Override public List getPartitionColumns(Optional snapshot) { - return getPartitionColumnsFromCache(); - } - - private List getPartitionColumnsFromCache() { makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - return schemaCacheValue - .map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns()) - .orElseGet(Lists::newArrayList); + IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); + IcebergSchemaCacheValue schemaValue = getIcebergSchemaCacheValue(snapshotValue.getSnapshotId()); + return schemaValue.getPartitionColumns(); } @Override public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, Optional snapshot) throws AnalysisException { - long latestSnapshotId = getPartitionInfoFromCache().getLatestSnapshotId(partitionName); + makeSureInitialized(); + IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); + long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName); if (latestSnapshotId <= 0) { throw new AnalysisException("can not find partition: " + partitionName); } @@ -244,16 +239,9 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont @Override public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) throws AnalysisException { - return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); - } - - public long getLatestSnapshotIdFromCache() throws AnalysisException { makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - if (!schemaCacheValue.isPresent()) { - throw new AnalysisException("Can't find schema cache of table " + name); - } - return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getSnapshotId(); + IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); + return new MTMVVersionSnapshot(snapshotValue.getSnapshotId()); } @Override @@ -268,6 +256,7 @@ public boolean isPartitionColumnAllowNull() { */ @Override public boolean isValidRelatedTable() { + makeSureInitialized(); if (isValidRelatedTableCached) { return isValidRelatedTable; } @@ -299,8 +288,47 @@ public boolean isValidRelatedTable() { return isValidRelatedTable; } - protected IcebergPartitionInfo loadPartitionInfo() throws AnalysisException { - List icebergPartitions = loadIcebergPartition(); + @Override + public MvccSnapshot loadSnapshot() { + return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue()); + } + + public long getLatestSnapshotId() { + if (table == null) { + table = IcebergUtils.getIcebergTable(catalog, dbName, name); + } + Snapshot snapshot = table.currentSnapshot(); + return snapshot == null ? IcebergUtils.LATEST_SNAPSHOT_ID : table.currentSnapshot().snapshotId(); + } + + @Override + public List getFullSchema() { + Optional snapshotFromContext = MvccUtil.getSnapshotFromContext(this); + IcebergSnapshotCacheValue cacheValue = getOrFetchSnapshotCacheValue(snapshotFromContext); + return getIcebergSchemaCacheValue(cacheValue.getSnapshotId()).getSchema(); + } + + @Override + public boolean supportInternalPartitionPruned() { + return true; + } + + public IcebergSchemaCacheValue getIcebergSchemaCacheValue(long snapshotId) { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + Optional schemaCacheValue = cache.getSchemaValue( + new IcebergSchemaCacheKey(dbName, name, snapshotId)); + if (!schemaCacheValue.isPresent()) { + throw new CacheException("failed to getSchema for: %s.%s.%s.%s", + null, catalog.getName(), dbName, name, snapshotId); + } + return (IcebergSchemaCacheValue) schemaCacheValue.get(); + } + + public IcebergPartitionInfo loadPartitionInfo(long snapshotId) throws AnalysisException { + if (!isValidRelatedTable() || snapshotId == IcebergUtils.LATEST_SNAPSHOT_ID) { + return new IcebergPartitionInfo(); + } + List icebergPartitions = loadIcebergPartition(snapshotId); Map nameToPartition = Maps.newHashMap(); Map nameToPartitionItem = Maps.newHashMap(); for (IcebergPartition partition : icebergPartitions) { @@ -314,11 +342,11 @@ protected IcebergPartitionInfo loadPartitionInfo() throws AnalysisException { return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, partitionNameMap); } - public List loadIcebergPartition() { + public List loadIcebergPartition(long snapshotId) { PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils .createMetadataTableInstance(table, MetadataTableType.PARTITIONS); List partitions = Lists.newArrayList(); - try (CloseableIterable tasks = partitionsTable.newScan().planFiles()) { + try (CloseableIterable tasks = partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) { for (FileScanTask task : tasks) { CloseableIterable rows = task.asDataTask().rows(); for (StructLike row : rows) { @@ -384,11 +412,12 @@ public IcebergPartition generateIcebergPartition(StructLike row) { @VisibleForTesting public Range getPartitionRange(String value, String transform) throws AnalysisException { - // For NULL value, create a lessThan partition for it. + // For NULL value, create a minimum partition for it. if (value == null) { - PartitionKey nullKey = PartitionKey.createPartitionKey( - Lists.newArrayList(new PartitionValue("0000-01-02")), partitionColumns); - return Range.lessThan(nullKey); + PartitionKey nullLowKey = PartitionKey.createPartitionKey( + Lists.newArrayList(new PartitionValue("0000-01-01")), partitionColumns); + PartitionKey nullUpKey = nullLowKey.successor(); + return Range.closedOpen(nullLowKey, nullUpKey); } LocalDateTime epoch = Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime(); LocalDateTime target; @@ -525,4 +554,12 @@ public boolean validRelatedTableCache() { public void setIsValidRelatedTableCached(boolean isCached) { this.isValidRelatedTableCached = isCached; } + + private IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional snapshot) { + if (snapshot.isPresent()) { + return ((IcebergMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); + } else { + return getIcebergSnapshotCacheValue(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index ad347ca78f2a4f3..45e01ae1cf8d8f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; @@ -49,6 +50,7 @@ public class IcebergMetadataCache { private final LoadingCache> snapshotListCache; private final LoadingCache tableCache; + private final LoadingCache snapshotCache; public IcebergMetadataCache(ExecutorService executor) { CacheFactory snapshotListCacheFactory = new CacheFactory( @@ -66,6 +68,14 @@ public IcebergMetadataCache(ExecutorService executor) { true, null); this.tableCache = tableCacheFactory.buildCache(key -> loadTable(key), null, executor); + + CacheFactory snapshotCacheFactory = new CacheFactory( + OptionalLong.of(28800L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_table_cache_num, + true, + null); + this.snapshotCache = snapshotCacheFactory.buildCache(key -> loadSnapshot(key), null, executor); } public List getSnapshotList(TIcebergMetadataParams params) throws UserException { @@ -92,6 +102,11 @@ public Table getAndCloneTable(CatalogIf catalog, String dbName, String tbName) { return restTable; } + public IcebergSnapshotCacheValue getSnapshotCache(CatalogIf catalog, String dbName, String tbName) { + IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, dbName, tbName); + return snapshotCache.get(key); + } + @NotNull private List loadSnapshots(IcebergMetadataCacheKey key) { Table icebergTable = getIcebergTable(key.catalog, key.dbName, key.tableName); @@ -114,6 +129,15 @@ private Table loadTable(IcebergMetadataCacheKey key) { () -> ops.loadTable(key.dbName, key.tableName)); } + @NotNull + private IcebergSnapshotCacheValue loadSnapshot(IcebergMetadataCacheKey key) throws AnalysisException { + IcebergExternalTable table = (IcebergExternalTable) key.catalog.getDbOrAnalysisException(key.dbName) + .getTableOrAnalysisException(key.tableName); + long snapshotId = table.getLatestSnapshotId(); + IcebergPartitionInfo icebergPartitionInfo = table.loadPartitionInfo(snapshotId); + return new IcebergSnapshotCacheValue(icebergPartitionInfo, snapshotId); + } + public void invalidateCatalogCache(long catalogId) { snapshotListCache.asMap().keySet().stream() .filter(key -> key.catalog.getId() == catalogId) @@ -125,6 +149,10 @@ public void invalidateCatalogCache(long catalogId) { ManifestFiles.dropCache(entry.getValue().io()); tableCache.invalidate(entry.getKey()); }); + + snapshotCache.asMap().keySet().stream() + .filter(key -> key.catalog.getId() == catalogId) + .forEach(snapshotCache::invalidate); } public void invalidateTableCache(long catalogId, String dbName, String tblName) { @@ -143,6 +171,11 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) ManifestFiles.dropCache(entry.getValue().io()); tableCache.invalidate(entry.getKey()); }); + + snapshotCache.asMap().keySet().stream() + .filter(key -> key.catalog.getId() == catalogId && key.dbName.equals(dbName) && key.tableName.equals( + tblName)) + .forEach(snapshotCache::invalidate); } public void invalidateDbCache(long catalogId, String dbName) { @@ -159,6 +192,10 @@ public void invalidateDbCache(long catalogId, String dbName) { ManifestFiles.dropCache(entry.getValue().io()); tableCache.invalidate(entry.getKey()); }); + + snapshotCache.asMap().keySet().stream() + .filter(key -> key.catalog.getId() == catalogId && key.dbName.equals(dbName)) + .forEach(snapshotCache::invalidate); } private static void initIcebergTableFileIO(Table table, Map props) { @@ -212,10 +249,12 @@ public int hashCode() { public Map> getCacheStats() { Map> res = Maps.newHashMap(); - res.put("iceberg_snapshot_cache", ExternalMetaCacheMgr.getCacheStats(snapshotListCache.stats(), + res.put("iceberg_snapshot_list_cache", ExternalMetaCacheMgr.getCacheStats(snapshotListCache.stats(), snapshotListCache.estimatedSize())); res.put("iceberg_table_cache", ExternalMetaCacheMgr.getCacheStats(tableCache.stats(), tableCache.estimatedSize())); + res.put("iceberg_snapshot_cache", ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(), + snapshotCache.estimatedSize())); return res; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMvccSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMvccSnapshot.java new file mode 100644 index 000000000000000..2c0155a71cd3893 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMvccSnapshot.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.datasource.mvcc.MvccSnapshot; + +public class IcebergMvccSnapshot implements MvccSnapshot { + private final IcebergSnapshotCacheValue snapshotCacheValue; + + public IcebergMvccSnapshot(IcebergSnapshotCacheValue snapshotCacheValue) { + this.snapshotCacheValue = snapshotCacheValue; + } + + public IcebergSnapshotCacheValue getSnapshotCacheValue() { + return snapshotCacheValue; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheKey.java new file mode 100644 index 000000000000000..39976eaccf58f54 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheKey.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; + +import com.google.common.base.Objects; + +public class IcebergSchemaCacheKey extends SchemaCacheKey { + private final long snapshotId; + + public IcebergSchemaCacheKey(String dbName, String tableName, long snapshotId) { + super(dbName, tableName); + this.snapshotId = snapshotId; + } + + public long getSnapshotId() { + return snapshotId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IcebergSchemaCacheKey)) { + return false; + } + if (!super.equals(o)) { + return false; + } + IcebergSchemaCacheKey that = (IcebergSchemaCacheKey) o; + return snapshotId == that.snapshotId; + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode(), snapshotId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java index e1fde8049fe1adc..ccfcaab0c7261d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java @@ -25,26 +25,13 @@ public class IcebergSchemaCacheValue extends SchemaCacheValue { private final List partitionColumns; - private final IcebergPartitionInfo partitionInfo; - private final long snapshotId; - public IcebergSchemaCacheValue(List schema, List partitionColumns, - long snapshotId, IcebergPartitionInfo partitionInfo) { + public IcebergSchemaCacheValue(List schema, List partitionColumns) { super(schema); this.partitionColumns = partitionColumns; - this.snapshotId = snapshotId; - this.partitionInfo = partitionInfo; } public List getPartitionColumns() { return partitionColumns; } - - public IcebergPartitionInfo getPartitionInfo() { - return partitionInfo; - } - - public long getSnapshotId() { - return snapshotId; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshotCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshotCacheValue.java new file mode 100644 index 000000000000000..ca03e14145a72da --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshotCacheValue.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +public class IcebergSnapshotCacheValue { + + private final IcebergPartitionInfo partitionInfo; + private final long snapshotId; + + public IcebergSnapshotCacheValue(IcebergPartitionInfo partitionInfo, long snapshotId) { + this.partitionInfo = partitionInfo; + this.snapshotId = snapshotId; + } + + public IcebergPartitionInfo getPartitionInfo() { + return partitionInfo; + } + + public long getSnapshotId() { + return snapshotId; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index ba6d628e492c209..0ba8865f3326fb8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -107,6 +107,8 @@ public Integer initialValue() { // nickname in spark public static final String SPARK_SQL_COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec"; + public static final long LATEST_SNAPSHOT_ID = -1; + public static Expression convertToIcebergExpr(Expr expr, Schema schema) { if (expr == null) { return null; @@ -573,10 +575,12 @@ private static org.apache.iceberg.Table getIcebergTableInternal(ExternalCatalog /** * Get iceberg schema from catalog and convert them to doris schema */ - public static List getSchema(ExternalCatalog catalog, String dbName, String name) { + public static List getSchema(ExternalCatalog catalog, String dbName, String name, long snapshotId) { return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), () -> { org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, dbName, name); - Schema schema = icebergTable.schema(); + Snapshot snapshot = snapshotId == LATEST_SNAPSHOT_ID + ? icebergTable.currentSnapshot() : icebergTable.snapshot(snapshotId); + Schema schema = snapshot == null ? icebergTable.schema() : icebergTable.schemas().get(snapshot.schemaId()); List columns = schema.columns(); List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); for (Types.NestedField field : columns) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java index 80d0a7c2429df38..d773a48a6b14f15 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java @@ -28,6 +28,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Range; import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; import mockit.Mocked; import mockit.Verifications; import org.apache.iceberg.PartitionField; @@ -52,6 +54,11 @@ public void testIsSupportedPartitionTable(@Mocked org.apache.iceberg.Table icebe @Mocked Schema schema) { IcebergExternalTable table = new IcebergExternalTable(1, "1", "2", null); Map specs = Maps.newHashMap(); + new MockUp() { + @Mock + private void makeSureInitialized() { + } + }; // Test null specs.put(0, null); new Expectations() {{ @@ -140,8 +147,9 @@ public void testGetPartitionRange() throws AnalysisException { // Test null partition value Range nullRange = table.getPartitionRange(null, "hour"); - Assertions.assertFalse(nullRange.hasLowerBound()); - Assertions.assertEquals("0000-01-02 00:00:00", + Assertions.assertEquals("0000-01-01 00:00:00", + nullRange.lowerEndpoint().getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("0000-01-01 00:00:01", nullRange.upperEndpoint().getPartitionValuesAsStringList().get(0)); // Test hour transform. diff --git a/regression-test/data/mtmv_p0/test_iceberg_mtmv.out b/regression-test/data/mtmv_p0/test_iceberg_mtmv.out index c9d9799da813000..483ac0957e6f679 100644 --- a/regression-test/data/mtmv_p0/test_iceberg_mtmv.out +++ b/regression-test/data/mtmv_p0/test_iceberg_mtmv.out @@ -103,3 +103,18 @@ 2024-09-30 6 2024-10-28 7 +-- !refresh_one_partition -- +2024-01-01T00:00 4 + +-- !refresh_one_partition_rewrite -- +2024-01-01T00:00 4 +2024-01-02T00:00 3 + +-- !refresh_auto -- +2024-01-01T00:00 4 +2024-01-02T00:00 3 + +-- !refresh_all_partition_rewrite -- +2024-01-01T00:00 4 +2024-01-02T00:00 3 + diff --git a/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy b/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy index 59cf1173acb46b1..aee80d8d1693a43 100644 --- a/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy @@ -83,6 +83,7 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_ String icebergDb = "iceberg_mtmv_partition" String icebergTable1 = "tstable" String icebergTable2 = "dtable" + String icebergTable3 = "union_test" sql """drop catalog if exists ${catalog_name} """ sql """create catalog if not exists ${catalog_name} properties ( 'type'='iceberg', @@ -210,6 +211,61 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_ sql """drop materialized view if exists ${mvName2};""" sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable2}""" + // Test rewrite and union partitions + sql """set materialized_view_rewrite_enable_contain_external_table=true;""" + String mvSql = "SELECT par,count(*) as num FROM ${catalog_name}.${icebergDb}.${icebergTable3} group by par" + String mvName = "union_mv" + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable3}""" + sql """ + CREATE TABLE ${catalog_name}.${icebergDb}.${icebergTable3} ( + id int, + value int, + par datetime + ) ENGINE=iceberg + PARTITION BY LIST (day(par)) (); + """ + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable3} values (1, 1, "2024-01-01"), (2, 1, "2024-01-01"), (3, 1, "2024-01-01"), (4, 1, "2024-01-01")""" + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable3} values (1, 2, "2024-01-02"), (2, 2, "2024-01-02"), (3, 2, "2024-01-02")""" + sql """analyze table ${catalog_name}.${icebergDb}.${icebergTable3} with sync""" + + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`par`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS ${mvSql} + """ + + def showPartitions = sql """show partitions from ${mvName}""" + logger.info("showPartitions: " + showPartitions.toString()) + assertTrue(showPartitions.toString().contains("p_20240101000000_20240102000000")) + assertTrue(showPartitions.toString().contains("p_20240102000000_20240103000000")) + + // refresh one partiton + sql """REFRESH MATERIALIZED VIEW ${mvName} partitions(p_20240101000000_20240102000000);""" + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_one_partition "SELECT * FROM ${mvName} " + def explainOnePartition = sql """ explain ${mvSql} """ + logger.info("explainOnePartition: " + explainOnePartition.toString()) + assertTrue(explainOnePartition.toString().contains("VUNION")) + order_qt_refresh_one_partition_rewrite "${mvSql}" + mv_rewrite_success("${mvSql}", "${mvName}") + + //refresh auto + sql """REFRESH MATERIALIZED VIEW ${mvName} auto""" + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_auto "SELECT * FROM ${mvName} " + def explainAllPartition = sql """ explain ${mvSql}; """ + logger.info("explainAllPartition: " + explainAllPartition.toString()) + assertTrue(explainAllPartition.toString().contains("VOlapScanNode")) + order_qt_refresh_all_partition_rewrite "${mvSql}" + mv_rewrite_success("${mvSql}", "${mvName}") + + sql """drop materialized view if exists ${mvName};""" + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable3}""" + sql """ drop catalog if exists ${catalog_name} """ } }