Skip to content

Commit

Permalink
mc && hudi
Browse files Browse the repository at this point in the history
  • Loading branch information
hubgeter committed Nov 26, 2024
1 parent 6ce0208 commit 832effc
Show file tree
Hide file tree
Showing 19 changed files with 1,026 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,10 +377,11 @@ public TableIndexes getTableIndexes() {
* @return
*/
public SelectedPartitions initSelectedPartitions(OptionalLong snapshotId) {
if (!supportPartitionPruned()) {
if (!supportInternalPartitionPruned()) {
return SelectedPartitions.NOT_PRUNED;
}
if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshotId))) {
//no partition table.
return SelectedPartitions.NOT_PRUNED;
}
Map<String, PartitionItem> nameToPartitionItems = getNameToPartitionItems(snapshotId);
Expand Down Expand Up @@ -410,11 +411,12 @@ public List<Column> getPartitionColumns(OptionalLong snapshotId) {
}

/**
* Does it support partition cpruned, If so, this method needs to be overridden in subclasses
* Does it support Internal partition pruned, If so, this method needs to be overridden in subclasses
* Internal partition pruned : Implement partition pruning logic without relying on external APIs.
*
* @return
*/
public boolean supportPartitionPruned() {
public boolean supportInternalPartitionPruned() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.hive;

import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
Expand All @@ -31,6 +32,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.mtmv.MTMVBaseTableIf;
Expand All @@ -40,6 +42,7 @@
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
Expand Down Expand Up @@ -302,10 +305,33 @@ public List<Column> getPartitionColumns(OptionalLong snapshotId) {
}

@Override
public boolean supportPartitionPruned() {
return getDlaType() == DLAType.HIVE;
public boolean supportInternalPartitionPruned() {
return getDlaType() == DLAType.HIVE || getDlaType() == DLAType.HUDI;
}

public SelectedPartitions initHudiSelectedPartitions(Optional<TableSnapshot> tableSnapshot) {
if (getDlaType() != DLAType.HUDI) {
return SelectedPartitions.NOT_PRUNED;
}

if (getPartitionColumns().isEmpty()) {
return SelectedPartitions.NOT_PRUNED;
}
TablePartitionValues tablePartitionValues = HudiUtils.getPartitionValues(tableSnapshot, this);


Map<Long, PartitionItem> idToPartitionItem = tablePartitionValues.getIdToPartitionItem();
Map<Long, String> idToNameMap = tablePartitionValues.getPartitionIdToNameMap();

Map<String, PartitionItem> nameToPartitionItems = Maps.newHashMapWithExpectedSize(idToPartitionItem.size());
for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
nameToPartitionItems.put(idToNameMap.get(entry.getKey()), entry.getValue());
}

return new SelectedPartitions(nameToPartitionItems.size(), nameToPartitionItems, false);
}


@Override
public Map<String, PartitionItem> getNameToPartitionItems(OptionalLong snapshotId) {
return getNameToPartitionItems();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class HiveScanNode extends FileQueryScanNode {

// will only be set in Nereids, for lagency planner, it should be null
@Setter
private SelectedPartitions selectedPartitions = null;
protected SelectedPartitions selectedPartitions = null;

private boolean partitionInit = false;
private final AtomicReference<UserException> batchException = new AtomicReference<>(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,35 @@

package org.apache.doris.datasource.hudi;

import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.hudi.source.HudiCachedPartitionProcessor;

import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class HudiUtils {
Expand Down Expand Up @@ -231,4 +242,44 @@ private static Type handleUnionType(Schema avroSchema) {
}
return Type.UNSUPPORTED;
}

public static TablePartitionValues getPartitionValues(Optional<TableSnapshot> tableSnapshot,
HMSExternalTable hmsTable) {
TablePartitionValues partitionValues = new TablePartitionValues();
if (hmsTable.getPartitionColumns().isEmpty()) {
//isn't partition table.
return partitionValues;
}

HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable);
HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv()
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
boolean useHiveSyncPartition = hmsTable.useHiveSyncPartition();

if (tableSnapshot.isPresent()) {
if (tableSnapshot.get().getType() == TableSnapshot.VersionType.VERSION) {
// Hudi does not support `FOR VERSION AS OF`, please use `FOR TIME AS OF`";
return partitionValues;
}
String queryInstant = tableSnapshot.get().getTime().replaceAll("[-: ]", "");

partitionValues =
HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> processor.getSnapshotPartitionValues(
hmsTable, hudiClient, queryInstant, useHiveSyncPartition));
} else {
HoodieTimeline timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
if (!snapshotInstant.isPresent()) {
return partitionValues;
}
partitionValues =
HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> processor.getPartitionValues(hmsTable, hudiClient, useHiveSyncPartition));
}
return partitionValues;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
Expand All @@ -29,12 +30,10 @@
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
Expand Down Expand Up @@ -68,7 +67,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -259,50 +257,29 @@ private void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) {
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}

private List<HivePartition> getPrunedPartitions(
HoodieTableMetaClient metaClient, Option<String> snapshotTimestamp) throws AnalysisException {
private List<HivePartition> getPrunedPartitions(HoodieTableMetaClient metaClient) {
List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
if (!partitionColumnTypes.isEmpty()) {
HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv()
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
TablePartitionValues partitionValues;
if (snapshotTimestamp.isPresent()) {
partitionValues = processor.getSnapshotPartitionValues(
hmsTable, metaClient, snapshotTimestamp.get(), useHiveSyncPartition);
} else {
partitionValues = processor.getPartitionValues(hmsTable, metaClient, useHiveSyncPartition);
}
if (partitionValues != null) {
// 2. prune partitions by expr
partitionValues.readLock().lock();
try {
Map<Long, PartitionItem> idToPartitionItem = partitionValues.getIdToPartitionItem();
this.totalPartitionNum = idToPartitionItem.size();
ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem,
hmsTable.getPartitionColumns(), columnNameToRange,
partitionValues.getUidToPartitionRange(),
partitionValues.getRangeToId(),
partitionValues.getSingleColumnRangeMap(),
true);
Collection<Long> filteredPartitionIds = pruner.prune();
this.selectedPartitionNum = filteredPartitionIds.size();
// 3. get partitions from cache
String dbName = hmsTable.getDbName();
String tblName = hmsTable.getName();
String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
String basePath = metaClient.getBasePathV2().toString();
Map<Long, String> partitionIdToNameMap = partitionValues.getPartitionIdToNameMap();
Map<Long, List<String>> partitionValuesMap = partitionValues.getPartitionValuesMap();
return filteredPartitionIds.stream().map(id -> {
String path = basePath + "/" + partitionIdToNameMap.get(id);
return new HivePartition(
dbName, tblName, false, inputFormat, path, partitionValuesMap.get(id),
Maps.newHashMap());
}).collect(Collectors.toList());
} finally {
partitionValues.readLock().unlock();
}
}
this.totalPartitionNum = selectedPartitions.totalPartitionNum;
Map<String, PartitionItem> prunedPartitions = selectedPartitions.selectedPartitions;
this.selectedPartitionNum = prunedPartitions.size();

String dbName = hmsTable.getDbName();
String tblName = hmsTable.getName();
String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
String basePath = metaClient.getBasePathV2().toString();

List<HivePartition> hivePartitions = Lists.newArrayList();
prunedPartitions.forEach(
(key, value) -> {
String path = basePath + "/" + key;
hivePartitions.add(new HivePartition(
dbName, tblName, false, inputFormat, path,
((ListPartitionItem) value).getItems().get(0).getPartitionValuesAsStringList(),
Maps.newHashMap()));
}
);
return hivePartitions;
}
// unpartitioned table, create a dummy partition to save location and inputformat,
// so that we can unify the interface.
Expand Down Expand Up @@ -392,7 +369,7 @@ public List<Split> getSplits() throws UserException {
if (!partitionInit) {
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient, snapshotTimestamp));
() -> getPrunedPartitions(hudiClient));
partitionInit = true;
}
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
Expand Down Expand Up @@ -454,7 +431,7 @@ public boolean isBatchMode() {
// Non partition table will get one dummy partition
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient, snapshotTimestamp));
() -> getPrunedPartitions(hudiClient));
partitionInit = true;
}
int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
Expand Down Expand Up @@ -50,7 +51,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;

/**
Expand All @@ -71,6 +74,14 @@ protected synchronized void makeSureInitialized() {
}
}

public boolean supportInternalPartitionPruned() {
return true;
}


public List<Column> getPartitionColumns(OptionalLong snapshotId) {
return getPartitionColumns();
}

public List<Column> getPartitionColumns() {
makeSureInitialized();
Expand All @@ -79,6 +90,23 @@ public List<Column> getPartitionColumns() {
.orElse(Collections.emptyList());
}

public Map<String, PartitionItem> getNameToPartitionItems(OptionalLong snapshotId) {
if (getPartitionColumns().isEmpty()) {
return Collections.emptyMap();
}

TablePartitionValues tablePartitionValues = getPartitionValues();

Map<Long, PartitionItem> idToPartitionItem = tablePartitionValues.getIdToPartitionItem();
Map<Long, String> idToNameMap = tablePartitionValues.getPartitionIdToNameMap();

Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMapWithExpectedSize(idToPartitionItem.size());
for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
nameToPartitionItem.put(idToNameMap.get(entry.getKey()), entry.getValue());
}
return nameToPartitionItem;
}

public TablePartitionValues getPartitionValues() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
Expand Down Expand Up @@ -110,6 +138,8 @@ private TablePartitionValues loadPartitionValues(MaxComputeSchemaCacheValue sche

/**
* parse all values from partitionPath to a single list.
* In MaxCompute : Support special characters : _$#.!@
* Ref : MaxCompute Error Code: ODPS-0130071 Invalid partition value.
*
* @param partitionColumns partitionColumns can contain the part1,part2,part3...
* @param partitionPath partitionPath format is like the 'part1=123/part2=abc/part3=1bc'
Expand Down
Loading

0 comments on commit 832effc

Please sign in to comment.