Skip to content

Commit

Permalink
[opt](hms table)Some optimizations for hms external table (#44909)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Problem Summary:

1. Increase the schema cache to reduce the time to obtain the schema.
2. `HoodieTableMetaClient` is stored in `HMSExternalTable` to prevent
redundant creation.
3. Cache HoodieTableFileSystemView to speed up getting FileGroup or
FileSlice.
4. Fix analyze path for `file:/abc`.
5. Add `FSDataInputStreamWrapper` to solve hudi conflict class.
  • Loading branch information
wuwenchi authored Dec 26, 2024
1 parent 732725d commit ddb727f
Show file tree
Hide file tree
Showing 16 changed files with 1,136 additions and 142 deletions.
1 change: 1 addition & 0 deletions fe/check/checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,5 @@ under the License.
<suppress files="HiveVersionUtil\.java" checks="[a-zA-Z0-9]*"/>
<suppress files="[\\/]com[\\/]amazonaws[\\/]glue[\\/]catalog[\\/]" checks="[a-zA-Z0-9]*"/>
<suppress files="[\\/]com[\\/]aliyun[\\/]datalake[\\/]metastore[\\/]hive2[\\/]" checks="[a-zA-Z0-9]*"/>
<suppress files="FSDataInputStreamWrapper\.java" checks="[a-zA-Z0-9]*"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hudi.source.HudiPartitionMgr;
import org.apache.doris.datasource.hudi.source.HudiCachedFsViewProcessor;
import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor;
import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
import org.apache.doris.datasource.hudi.source.HudiPartitionProcessor;
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
Expand Down Expand Up @@ -88,7 +90,7 @@ public class ExternalMetaCacheMgr {
// catalog id -> table schema cache
private Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap();
// hudi partition manager
private final HudiPartitionMgr hudiPartitionMgr;
private final HudiMetadataCacheMgr hudiMetadataCacheMgr;
// all catalogs could share the same fsCache.
private FileSystemCache fsCache;
// all external table row count cache.
Expand Down Expand Up @@ -123,7 +125,7 @@ public ExternalMetaCacheMgr() {
fsCache = new FileSystemCache();
rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);

hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor);
hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor);
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor);
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor);
Expand Down Expand Up @@ -165,7 +167,19 @@ public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
}

public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog catalog) {
return hudiPartitionMgr.getPartitionProcessor(catalog);
return hudiMetadataCacheMgr.getPartitionProcessor(catalog);
}

public HudiCachedFsViewProcessor getFsViewProcessor(ExternalCatalog catalog) {
return hudiMetadataCacheMgr.getFsViewProcessor(catalog);
}

public HudiCachedMetaClientProcessor getMetaClientProcessor(ExternalCatalog catalog) {
return hudiMetadataCacheMgr.getHudiMetaClientProcessor(catalog);
}

public HudiMetadataCacheMgr getHudiMetadataCacheMgr() {
return hudiMetadataCacheMgr;
}

public IcebergMetadataCache getIcebergMetadataCache() {
Expand Down Expand Up @@ -195,7 +209,7 @@ public void removeCache(long catalogId) {
if (schemaCacheMap.remove(catalogId) != null) {
LOG.info("remove schema cache for catalog {}", catalogId);
}
hudiPartitionMgr.removePartitionProcessor(catalogId);
hudiMetadataCacheMgr.removeCache(catalogId);
icebergMetadataCacheMgr.removeCache(catalogId);
maxComputeMetadataCacheMgr.removeCache(catalogId);
paimonMetadataCacheMgr.removeCache(catalogId);
Expand All @@ -211,7 +225,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName)
if (metaCache != null) {
metaCache.invalidateTableCache(dbName, tblName);
}
hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName);
hudiMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
Expand All @@ -230,7 +244,7 @@ public void invalidateDbCache(long catalogId, String dbName) {
if (metaCache != null) {
metaCache.invalidateDbCache(dbName);
}
hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName);
hudiMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
Expand All @@ -248,7 +262,7 @@ public void invalidateCatalogCache(long catalogId) {
if (metaCache != null) {
metaCache.invalidateAll();
}
hudiPartitionMgr.cleanPartitionProcess(catalogId);
hudiMetadataCacheMgr.invalidateCatalogCache(catalogId);
icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -532,34 +534,38 @@ public long getLastDdlTime() {
@Override
public Optional<SchemaCacheValue> initSchema() {
makeSureInitialized();
List<Column> columns;
if (dlaType.equals(DLAType.ICEBERG)) {
columns = getIcebergSchema();
return getIcebergSchema();
} else if (dlaType.equals(DLAType.HUDI)) {
columns = getHudiSchema();
return getHudiSchema();
} else {
columns = getHiveSchema();
return getHiveSchema();
}
List<Column> partitionColumns = initPartitionColumns(columns);
return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}

private List<Column> getIcebergSchema() {
return IcebergUtils.getSchema(catalog, dbName, name, IcebergUtils.UNKNOWN_SNAPSHOT_ID);
private Optional<SchemaCacheValue> getIcebergSchema() {
List<Column> columns = IcebergUtils.getSchema(catalog, dbName, name);
List<Column> partitionColumns = initPartitionColumns(columns);
return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}

private List<Column> getHudiSchema() {
private Optional<SchemaCacheValue> getHudiSchema() {
org.apache.avro.Schema hudiSchema = HiveMetaStoreClientHelper.getHudiTableSchema(this);
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hudiSchema.getFields().size());
List<String> colTypes = Lists.newArrayList();
for (org.apache.avro.Schema.Field hudiField : hudiSchema.getFields()) {
String columnName = hudiField.name().toLowerCase(Locale.ROOT);
tmpSchema.add(new Column(columnName, HudiUtils.fromAvroHudiTypeToDorisType(hudiField.schema()),
true, null, true, null, "", true, null, -1, null));
colTypes.add(HudiUtils.convertAvroToHiveType(hudiField.schema()));
}
return tmpSchema;
List<Column> partitionColumns = initPartitionColumns(tmpSchema);
HudiSchemaCacheValue hudiSchemaCacheValue = new HudiSchemaCacheValue(tmpSchema, partitionColumns);
hudiSchemaCacheValue.setColTypes(colTypes);
return Optional.of(hudiSchemaCacheValue);
}

private List<Column> getHiveSchema() {
private Optional<SchemaCacheValue> getHiveSchema() {
HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient();
List<FieldSchema> schema = client.getSchema(dbName, name);
Map<String, String> colDefaultValues = client.getDefaultColumnValues(dbName, name);
Expand All @@ -571,7 +577,8 @@ private List<Column> getHiveSchema() {
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, defaultValue, field.getComment(), true, -1));
}
return columns;
List<Column> partitionColumns = initPartitionColumns(columns);
return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}

@Override
Expand Down Expand Up @@ -649,9 +656,7 @@ public Optional<ColumnStatistic> getColumnStatistic(String colName) {
break;
}
default:
if (LOG.isDebugEnabled()) {
LOG.debug("get column stats for dlaType {} is not supported.", dlaType);
}
LOG.warn("get column stats for dlaType {} is not supported.", dlaType);
}
return Optional.empty();
}
Expand Down Expand Up @@ -1016,4 +1021,15 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
Env.getCurrentEnv().getRefreshManager()
.refreshTable(getCatalog().getName(), getDbName(), getName(), true);
}

public HoodieTableMetaClient getHudiClient() {
return Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getMetaClientProcessor(getCatalog())
.getHoodieTableMetaClient(
getDbName(),
getName(),
getRemoteTable().getSd().getLocation(),
getCatalog().getConfiguration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -805,9 +804,18 @@ public static String showCreateTable(org.apache.hadoop.hive.metastore.api.Table
}

public static Schema getHudiTableSchema(HMSExternalTable table) {
HoodieTableMetaClient metaClient = getHudiClient(table);
HoodieTableMetaClient metaClient = table.getHudiClient();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
Schema hudiSchema;

// Here, the timestamp should be reloaded again.
// Because when hudi obtains the schema in `getTableAvroSchema`, it needs to read the specified commit file,
// which is saved in the `metaClient`.
// But the `metaClient` is obtained from cache, so the file obtained may be an old file.
// This file may be deleted by hudi clean task, and an error will be reported.
// So, we should reload timeline so that we can read the latest commit files.
metaClient.reloadActiveTimeline();

try {
hudiSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema());
} catch (Exception e) {
Expand All @@ -833,14 +841,6 @@ public static <T> T ugiDoAs(Configuration conf, PrivilegedExceptionAction<T> act
}
}

public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) {
String hudiBasePath = table.getRemoteTable().getSd().getLocation();
Configuration conf = getConfiguration(table);
HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf);
return ugiDoAs(conf, () -> HoodieTableMetaClient.builder().setConf(hadoopStorageConfiguration)
.setBasePath(hudiBasePath).build());
}

public static Configuration getConfiguration(HMSExternalTable table) {
return table.getCatalog().getConfiguration();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.hudi;

import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.hive.HMSSchemaCacheValue;

import java.util.List;

public class HudiSchemaCacheValue extends HMSSchemaCacheValue {

private List<String> colTypes;

public HudiSchemaCacheValue(List<Column> schema, List<Column> partitionColumns) {
super(schema, partitionColumns);
}

public List<String> getColTypes() {
return colTypes;
}

public void setColTypes(List<String> colTypes) {
this.colTypes = colTypes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;

import java.text.ParseException;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -251,7 +253,7 @@ public static TablePartitionValues getPartitionValues(Optional<TableSnapshot> ta
return partitionValues;
}

HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable);
HoodieTableMetaClient hudiClient = hmsTable.getHudiClient();
HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv()
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
boolean useHiveSyncPartition = hmsTable.useHiveSyncPartition();
Expand Down Expand Up @@ -281,4 +283,12 @@ public static TablePartitionValues getPartitionValues(Optional<TableSnapshot> ta
}
return partitionValues;
}

public static HoodieTableMetaClient buildHudiTableMetaClient(String hudiBasePath, Configuration conf) {
HadoopStorageConfiguration hadoopStorageConfiguration = new HadoopStorageConfiguration(conf);
return HiveMetaStoreClientHelper.ugiDoAs(
conf,
() -> HoodieTableMetaClient.builder()
.setConf(hadoopStorageConfiguration).setBasePath(hudiBasePath).build());
}
}
Loading

0 comments on commit ddb727f

Please sign in to comment.