Skip to content

Commit

Permalink
branch-3.0: [fix](hudi)Add hudi catalog read partition table partitio…
Browse files Browse the repository at this point in the history
…n prune #44669 (#45111)

Cherry-picked from #44669

Co-authored-by: daidai <[email protected]>
  • Loading branch information
github-actions[bot] and hubgeter authored Dec 6, 2024
1 parent 15c3807 commit 7df5c10
Show file tree
Hide file tree
Showing 7 changed files with 792 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.hive;

import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
Expand All @@ -31,6 +32,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
Expand All @@ -41,6 +43,7 @@
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
Expand Down Expand Up @@ -302,7 +305,28 @@ public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {

@Override
public boolean supportInternalPartitionPruned() {
return getDlaType() == DLAType.HIVE;
return getDlaType() == DLAType.HIVE || getDlaType() == DLAType.HUDI;
}

public SelectedPartitions initHudiSelectedPartitions(Optional<TableSnapshot> tableSnapshot) {
if (getDlaType() != DLAType.HUDI) {
return SelectedPartitions.NOT_PRUNED;
}

if (getPartitionColumns().isEmpty()) {
return SelectedPartitions.NOT_PRUNED;
}
TablePartitionValues tablePartitionValues = HudiUtils.getPartitionValues(tableSnapshot, this);

Map<Long, PartitionItem> idToPartitionItem = tablePartitionValues.getIdToPartitionItem();
Map<Long, String> idToNameMap = tablePartitionValues.getPartitionIdToNameMap();

Map<String, PartitionItem> nameToPartitionItems = Maps.newHashMapWithExpectedSize(idToPartitionItem.size());
for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
nameToPartitionItems.put(idToNameMap.get(entry.getKey()), entry.getValue());
}

return new SelectedPartitions(nameToPartitionItems.size(), nameToPartitionItems, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,35 @@

package org.apache.doris.datasource.hudi;

import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.hudi.source.HudiCachedPartitionProcessor;

import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class HudiUtils {
Expand Down Expand Up @@ -231,4 +242,43 @@ private static Type handleUnionType(Schema avroSchema) {
}
return Type.UNSUPPORTED;
}

public static TablePartitionValues getPartitionValues(Optional<TableSnapshot> tableSnapshot,
HMSExternalTable hmsTable) {
TablePartitionValues partitionValues = new TablePartitionValues();
if (hmsTable.getPartitionColumns().isEmpty()) {
//isn't partition table.
return partitionValues;
}

HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable);
HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv()
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
boolean useHiveSyncPartition = hmsTable.useHiveSyncPartition();

if (tableSnapshot.isPresent()) {
if (tableSnapshot.get().getType() == TableSnapshot.VersionType.VERSION) {
// Hudi does not support `FOR VERSION AS OF`, please use `FOR TIME AS OF`";
return partitionValues;
}
String queryInstant = tableSnapshot.get().getTime().replaceAll("[-: ]", "");

partitionValues =
HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> processor.getSnapshotPartitionValues(
hmsTable, hudiClient, queryInstant, useHiveSyncPartition));
} else {
HoodieTimeline timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
if (!snapshotInstant.isPresent()) {
return partitionValues;
}
partitionValues =
HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> processor.getPartitionValues(hmsTable, hudiClient, useHiveSyncPartition));
}
return partitionValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
Expand All @@ -30,12 +31,10 @@
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
Expand Down Expand Up @@ -70,7 +69,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -286,50 +284,29 @@ private boolean canUseNativeReader() {
return !sessionVariable.isForceJniScanner() && isCowTable;
}

private List<HivePartition> getPrunedPartitions(
HoodieTableMetaClient metaClient, Option<String> snapshotTimestamp) throws AnalysisException {
private List<HivePartition> getPrunedPartitions(HoodieTableMetaClient metaClient) {
List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
if (!partitionColumnTypes.isEmpty()) {
HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv()
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
TablePartitionValues partitionValues;
if (snapshotTimestamp.isPresent()) {
partitionValues = processor.getSnapshotPartitionValues(
hmsTable, metaClient, snapshotTimestamp.get(), useHiveSyncPartition);
} else {
partitionValues = processor.getPartitionValues(hmsTable, metaClient, useHiveSyncPartition);
}
if (partitionValues != null) {
// 2. prune partitions by expr
partitionValues.readLock().lock();
try {
Map<Long, PartitionItem> idToPartitionItem = partitionValues.getIdToPartitionItem();
this.totalPartitionNum = idToPartitionItem.size();
ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem,
hmsTable.getPartitionColumns(), columnNameToRange,
partitionValues.getUidToPartitionRange(),
partitionValues.getRangeToId(),
partitionValues.getSingleColumnRangeMap(),
true);
Collection<Long> filteredPartitionIds = pruner.prune();
this.selectedPartitionNum = filteredPartitionIds.size();
// 3. get partitions from cache
String dbName = hmsTable.getDbName();
String tblName = hmsTable.getName();
String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
String basePath = metaClient.getBasePathV2().toString();
Map<Long, String> partitionIdToNameMap = partitionValues.getPartitionIdToNameMap();
Map<Long, List<String>> partitionValuesMap = partitionValues.getPartitionValuesMap();
return filteredPartitionIds.stream().map(id -> {
String path = basePath + "/" + partitionIdToNameMap.get(id);
return new HivePartition(
dbName, tblName, false, inputFormat, path, partitionValuesMap.get(id),
Maps.newHashMap());
}).collect(Collectors.toList());
} finally {
partitionValues.readLock().unlock();
}
}
this.totalPartitionNum = selectedPartitions.totalPartitionNum;
Map<String, PartitionItem> prunedPartitions = selectedPartitions.selectedPartitions;
this.selectedPartitionNum = prunedPartitions.size();

String dbName = hmsTable.getDbName();
String tblName = hmsTable.getName();
String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
String basePath = metaClient.getBasePathV2().toString();

List<HivePartition> hivePartitions = Lists.newArrayList();
prunedPartitions.forEach(
(key, value) -> {
String path = basePath + "/" + key;
hivePartitions.add(new HivePartition(
dbName, tblName, false, inputFormat, path,
((ListPartitionItem) value).getItems().get(0).getPartitionValuesAsStringList(),
Maps.newHashMap()));
}
);
return hivePartitions;
}
// unpartitioned table, create a dummy partition to save location and
// inputformat,
Expand Down Expand Up @@ -420,7 +397,7 @@ public List<Split> getSplits() throws UserException {
if (!partitionInit) {
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient, snapshotTimestamp));
() -> getPrunedPartitions(hudiClient));
partitionInit = true;
}
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
Expand Down Expand Up @@ -482,7 +459,7 @@ public boolean isBatchMode() {
// Non partition table will get one dummy partition
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient, snapshotTimestamp));
() -> getPrunedPartitions(hudiClient));
partitionInit = true;
}
int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,8 @@ public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTransla
if (fileScan.getTableSnapshot().isPresent()) {
((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
}
HudiScanNode hudiScanNode = (HudiScanNode) scanNode;
hudiScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected LogicalHudiScan(RelationId id, ExternalTable table, List<String> quali
public LogicalHudiScan(RelationId id, ExternalTable table, List<String> qualifier,
Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) {
this(id, table, qualifier, Optional.empty(), Optional.empty(),
SelectedPartitions.NOT_PRUNED, tableSample, tableSnapshot,
((HMSExternalTable) table).initHudiSelectedPartitions(tableSnapshot), tableSample, tableSnapshot,
Optional.empty(), Optional.empty());
}

Expand Down
Loading

0 comments on commit 7df5c10

Please sign in to comment.