Skip to content

Commit

Permalink
hudi
Browse files Browse the repository at this point in the history
  • Loading branch information
hubgeter committed Nov 26, 2024
1 parent 18771cc commit f0c153c
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class HiveScanNode extends FileQueryScanNode {

// will only be set in Nereids, for lagency planner, it should be null
@Setter
private SelectedPartitions selectedPartitions = null;
protected SelectedPartitions selectedPartitions = null;

private boolean partitionInit = false;
private final AtomicReference<UserException> batchException = new AtomicReference<>(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,38 @@

package org.apache.doris.datasource.hudi;

import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.hudi.source.HudiCachedPartitionProcessor;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;

import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class HudiUtils {
Expand Down Expand Up @@ -231,4 +245,51 @@ private static Type handleUnionType(Schema avroSchema) {
}
return Type.UNSUPPORTED;
}

public static boolean isPartitionTable(HMSExternalTable hmsTable) {
return !hmsTable.getPartitionColumnTypes().isEmpty();
}

public static SelectedPartitions getAllPartitions(Optional<TableSnapshot> tableSnapshot,
HMSExternalTable hmsTable) {
if (!isPartitionTable(hmsTable)) {
return SelectedPartitions.NOT_PRUNED;
}
Map<Long, PartitionItem> idToPartitionItem =
getPartitionValues(tableSnapshot, hmsTable).getIdToPartitionItem();
return new SelectedPartitions(idToPartitionItem.size(), idToPartitionItem, false);
}

public static TablePartitionValues getPartitionValues(Optional<TableSnapshot> tableSnapshot,
HMSExternalTable hmsTable) {
TablePartitionValues partitionValues = new TablePartitionValues();
if (!isPartitionTable(hmsTable)) {
//isn't partition table.
return partitionValues;
}

HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable);
HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv()
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
boolean useHiveSyncPartition = hmsTable.useHiveSyncPartition();

if (tableSnapshot.isPresent()) {
if (tableSnapshot.get().getType() == TableSnapshot.VersionType.VERSION) {
// Hudi does not support `FOR VERSION AS OF`, please use `FOR TIME AS OF`";
return partitionValues;
}
String queryInstant = tableSnapshot.get().getTime().replaceAll("[-: ]", "");

partitionValues = processor.getSnapshotPartitionValues(
hmsTable, hudiClient, queryInstant, useHiveSyncPartition);
} else {
HoodieTimeline timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
if (!snapshotInstant.isPresent()) {
return partitionValues;
}
partitionValues = processor.getPartitionValues(hmsTable, hudiClient, useHiveSyncPartition);
}
return partitionValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
Expand Down Expand Up @@ -68,7 +67,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -263,47 +261,35 @@ private List<HivePartition> getPrunedPartitions(
HoodieTableMetaClient metaClient, Option<String> snapshotTimestamp) throws AnalysisException {
List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
if (!partitionColumnTypes.isEmpty()) {
HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv()
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
TablePartitionValues partitionValues;
if (snapshotTimestamp.isPresent()) {
partitionValues = processor.getSnapshotPartitionValues(
hmsTable, metaClient, snapshotTimestamp.get(), useHiveSyncPartition);
} else {
partitionValues = processor.getPartitionValues(hmsTable, metaClient, useHiveSyncPartition);
}
if (partitionValues != null) {
// 2. prune partitions by expr
partitionValues.readLock().lock();
try {
Map<Long, PartitionItem> idToPartitionItem = partitionValues.getIdToPartitionItem();
this.totalPartitionNum = idToPartitionItem.size();
ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem,
hmsTable.getPartitionColumns(), columnNameToRange,
partitionValues.getUidToPartitionRange(),
partitionValues.getRangeToId(),
partitionValues.getSingleColumnRangeMap(),
true);
Collection<Long> filteredPartitionIds = pruner.prune();
this.selectedPartitionNum = filteredPartitionIds.size();
// 3. get partitions from cache
String dbName = hmsTable.getDbName();
String tblName = hmsTable.getName();
String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
String basePath = metaClient.getBasePathV2().toString();
Map<Long, String> partitionIdToNameMap = partitionValues.getPartitionIdToNameMap();
Map<Long, List<String>> partitionValuesMap = partitionValues.getPartitionValuesMap();
return filteredPartitionIds.stream().map(id -> {
String path = basePath + "/" + partitionIdToNameMap.get(id);
return new HivePartition(
dbName, tblName, false, inputFormat, path, partitionValuesMap.get(id),
Maps.newHashMap());
}).collect(Collectors.toList());
} finally {
partitionValues.readLock().unlock();
this.totalPartitionNum = selectedPartitions.totalPartitionNum;
Map<Long, PartitionItem> prunedPartitions = selectedPartitions.selectedPartitions;
this.selectedPartitionNum = prunedPartitions.size();


// 3. get partitions from cache
String dbName = hmsTable.getDbName();
String tblName = hmsTable.getName();
String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
String basePath = metaClient.getBasePathV2().toString();

TablePartitionValues tablePartitionValues = HudiUtils.getPartitionValues(
getQueryTableSnapshot() == null ? Optional.empty() : Optional.of(getQueryTableSnapshot()),
hmsTable);
Map<Long, String> partitionIdToNameMap = tablePartitionValues.getPartitionIdToNameMap();
Map<Long, List<String>> partitionValuesMap = tablePartitionValues.getPartitionValuesMap();

List<HivePartition> hivePartitions = Lists.newArrayList();
prunedPartitions.forEach(
(key, value) -> {
String path = basePath + "/" + partitionIdToNameMap.get(key);
hivePartitions.add(new HivePartition(
dbName, tblName, false, inputFormat, path, partitionValuesMap.get(key),
Maps.newHashMap()));
}
}
);
return hivePartitions;
}

// unpartitioned table, create a dummy partition to save location and inputformat,
// so that we can unify the interface.
HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(), hmsTable.getName(), true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,8 @@ public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTransla
PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false,
hudiScan.getScanParams(), hudiScan.getIncrementalRelation());
HudiScanNode hudiScanNode = (HudiScanNode) scanNode;
hudiScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
if (fileScan.getTableSnapshot().isPresent()) {
((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.nereids.CTEContext;
import org.apache.doris.nereids.CascadesContext;
Expand Down Expand Up @@ -75,6 +77,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan;
Expand Down Expand Up @@ -428,15 +431,19 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio
return new LogicalSubQueryAlias<>(qualifiedTableName, hiveViewPlan);
}
if (hmsTable.getDlaType() == DLAType.HUDI) {
SelectedPartitions selectedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> HudiUtils.getAllPartitions(unboundRelation.getTableSnapshot(), hmsTable));
LogicalHudiScan hudiScan = new LogicalHudiScan(unboundRelation.getRelationId(), hmsTable,
qualifierWithoutTableName, unboundRelation.getTableSample(),
unboundRelation.getTableSnapshot());
unboundRelation.getTableSnapshot(),
selectedPartitions);
hudiScan = hudiScan.withScanParams(hmsTable, unboundRelation.getScanParams());
return hudiScan;
} else {
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table,
return new LogicalFileScan(unboundRelation.getRelationId(), hmsTable,
qualifierWithoutTableName,
((HMSExternalTable) table).getAllPartitions(),
hmsTable.getAllPartitions(),
unboundRelation.getTableSample(),
unboundRelation.getTableSnapshot());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,19 @@ public Rule build() {
LogicalFileScan scan = filter.child();
ExternalTable tbl = scan.getTable();

SelectedPartitions selectedPartitions;
SelectedPartitions selectedPartitions = null;
// TODO(cmy): support other external table
if (tbl instanceof HMSExternalTable && ((HMSExternalTable) tbl).getDlaType() == DLAType.HIVE) {
if (tbl instanceof HMSExternalTable) {
HMSExternalTable hiveTbl = (HMSExternalTable) tbl;
selectedPartitions = pruneHivePartitions(hiveTbl, filter, scan, ctx.cascadesContext);
if (hiveTbl.getDlaType() == DLAType.HIVE || hiveTbl.getDlaType() == DLAType.HUDI) {
selectedPartitions = pruneHivePartitions(hiveTbl, filter, scan, ctx.cascadesContext);
}
} else if (tbl instanceof MaxComputeExternalTable) {
MaxComputeExternalTable maxComputeTbl = (MaxComputeExternalTable) tbl;
selectedPartitions = pruneMaxComputePartitions(maxComputeTbl, filter,
scan, ctx.cascadesContext);
} else {
}
if (selectedPartitions == null) {
// set isPruned so that it won't go pass the partition prune again
selectedPartitions = new SelectedPartitions(0, ImmutableMap.of(), true);
}
Expand All @@ -79,6 +82,7 @@ public Rule build() {
}).toRule(RuleType.FILE_SCAN_PARTITION_PRUNE);
}

//prune Hive and Hudi partitions.
private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl,
LogicalFilter<LogicalFileScan> filter, LogicalFileScan scan, CascadesContext ctx) {
Map<Long, PartitionItem> selectedPartitionItems = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ protected LogicalHudiScan(RelationId id, ExternalTable table, List<String> quali
}

public LogicalHudiScan(RelationId id, ExternalTable table, List<String> qualifier,
Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) {
Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot,
SelectedPartitions selectedPartitions) {
this(id, table, qualifier, Optional.empty(), Optional.empty(),
SelectedPartitions.NOT_PRUNED, tableSample, tableSnapshot,
selectedPartitions, tableSample, tableSnapshot,
Optional.empty(), Optional.empty());
}

Expand Down

0 comments on commit f0c153c

Please sign in to comment.