From f0c153c6b023488e9bf59f3d3bda5f3bdc5c7ebd Mon Sep 17 00:00:00 2001 From: daidai Date: Tue, 26 Nov 2024 13:22:25 +0800 Subject: [PATCH] hudi --- .../datasource/hive/source/HiveScanNode.java | 2 +- .../doris/datasource/hudi/HudiUtils.java | 61 +++++++++++++++++ .../datasource/hudi/source/HudiScanNode.java | 68 ++++++++----------- .../translator/PhysicalPlanTranslator.java | 2 + .../nereids/rules/analysis/BindRelation.java | 13 +++- .../rules/rewrite/PruneFileScanPartition.java | 12 ++-- .../trees/plans/logical/LogicalHudiScan.java | 5 +- 7 files changed, 112 insertions(+), 51 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index dbf1ea9cd9a4e42..dea8838245fa6e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -82,7 +82,7 @@ public class HiveScanNode extends FileQueryScanNode { // will only be set in Nereids, for lagency planner, it should be null @Setter - private SelectedPartitions selectedPartitions = null; + protected SelectedPartitions selectedPartitions = null; private boolean partitionInit = false; private final AtomicReference batchException = new AtomicReference<>(null); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java index d7803b1a516f9e8..e4c67c439683ffa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java @@ -17,24 +17,38 @@ package org.apache.doris.datasource.hudi; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; +import org.apache.doris.datasource.TablePartitionValues; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; +import org.apache.doris.datasource.hudi.source.HudiCachedPartitionProcessor; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; public class HudiUtils { @@ -231,4 +245,51 @@ private static Type handleUnionType(Schema avroSchema) { } return Type.UNSUPPORTED; } + + public static boolean isPartitionTable(HMSExternalTable hmsTable) { + return !hmsTable.getPartitionColumnTypes().isEmpty(); + } + + public static SelectedPartitions getAllPartitions(Optional tableSnapshot, + HMSExternalTable hmsTable) { + if (!isPartitionTable(hmsTable)) { + return SelectedPartitions.NOT_PRUNED; + } + Map idToPartitionItem = + getPartitionValues(tableSnapshot, hmsTable).getIdToPartitionItem(); + return new SelectedPartitions(idToPartitionItem.size(), idToPartitionItem, false); + } + + public static TablePartitionValues getPartitionValues(Optional tableSnapshot, + HMSExternalTable hmsTable) { + TablePartitionValues partitionValues = new TablePartitionValues(); + if (!isPartitionTable(hmsTable)) { + //isn't partition table. + return partitionValues; + } + + HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable); + HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv() + .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog()); + boolean useHiveSyncPartition = hmsTable.useHiveSyncPartition(); + + if (tableSnapshot.isPresent()) { + if (tableSnapshot.get().getType() == TableSnapshot.VersionType.VERSION) { + // Hudi does not support `FOR VERSION AS OF`, please use `FOR TIME AS OF`"; + return partitionValues; + } + String queryInstant = tableSnapshot.get().getTime().replaceAll("[-: ]", ""); + + partitionValues = processor.getSnapshotPartitionValues( + hmsTable, hudiClient, queryInstant, useHiveSyncPartition); + } else { + HoodieTimeline timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); + Option snapshotInstant = timeline.lastInstant(); + if (!snapshotInstant.isPresent()) { + return partitionValues; + } + partitionValues = processor.getPartitionValues(hmsTable, hudiClient, useHiveSyncPartition); + } + return partitionValues; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index a8f2a362bfde8d0..4dc1cfc88cec9c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -34,7 +34,6 @@ import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hudi.HudiUtils; -import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; @@ -68,7 +67,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Locale; @@ -263,47 +261,35 @@ private List getPrunedPartitions( HoodieTableMetaClient metaClient, Option snapshotTimestamp) throws AnalysisException { List partitionColumnTypes = hmsTable.getPartitionColumnTypes(); if (!partitionColumnTypes.isEmpty()) { - HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv() - .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog()); - TablePartitionValues partitionValues; - if (snapshotTimestamp.isPresent()) { - partitionValues = processor.getSnapshotPartitionValues( - hmsTable, metaClient, snapshotTimestamp.get(), useHiveSyncPartition); - } else { - partitionValues = processor.getPartitionValues(hmsTable, metaClient, useHiveSyncPartition); - } - if (partitionValues != null) { - // 2. prune partitions by expr - partitionValues.readLock().lock(); - try { - Map idToPartitionItem = partitionValues.getIdToPartitionItem(); - this.totalPartitionNum = idToPartitionItem.size(); - ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem, - hmsTable.getPartitionColumns(), columnNameToRange, - partitionValues.getUidToPartitionRange(), - partitionValues.getRangeToId(), - partitionValues.getSingleColumnRangeMap(), - true); - Collection filteredPartitionIds = pruner.prune(); - this.selectedPartitionNum = filteredPartitionIds.size(); - // 3. get partitions from cache - String dbName = hmsTable.getDbName(); - String tblName = hmsTable.getName(); - String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); - String basePath = metaClient.getBasePathV2().toString(); - Map partitionIdToNameMap = partitionValues.getPartitionIdToNameMap(); - Map> partitionValuesMap = partitionValues.getPartitionValuesMap(); - return filteredPartitionIds.stream().map(id -> { - String path = basePath + "/" + partitionIdToNameMap.get(id); - return new HivePartition( - dbName, tblName, false, inputFormat, path, partitionValuesMap.get(id), - Maps.newHashMap()); - }).collect(Collectors.toList()); - } finally { - partitionValues.readLock().unlock(); + this.totalPartitionNum = selectedPartitions.totalPartitionNum; + Map prunedPartitions = selectedPartitions.selectedPartitions; + this.selectedPartitionNum = prunedPartitions.size(); + + + // 3. get partitions from cache + String dbName = hmsTable.getDbName(); + String tblName = hmsTable.getName(); + String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); + String basePath = metaClient.getBasePathV2().toString(); + + TablePartitionValues tablePartitionValues = HudiUtils.getPartitionValues( + getQueryTableSnapshot() == null ? Optional.empty() : Optional.of(getQueryTableSnapshot()), + hmsTable); + Map partitionIdToNameMap = tablePartitionValues.getPartitionIdToNameMap(); + Map> partitionValuesMap = tablePartitionValues.getPartitionValuesMap(); + + List hivePartitions = Lists.newArrayList(); + prunedPartitions.forEach( + (key, value) -> { + String path = basePath + "/" + partitionIdToNameMap.get(key); + hivePartitions.add(new HivePartition( + dbName, tblName, false, inputFormat, path, partitionValuesMap.get(key), + Maps.newHashMap())); } - } + ); + return hivePartitions; } + // unpartitioned table, create a dummy partition to save location and inputformat, // so that we can unify the interface. HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(), hmsTable.getName(), true, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index a413d2745b27870..5f3e0c46e003702 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -651,6 +651,8 @@ public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTransla PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan; ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false, hudiScan.getScanParams(), hudiScan.getIncrementalRelation()); + HudiScanNode hudiScanNode = (HudiScanNode) scanNode; + hudiScanNode.setSelectedPartitions(fileScan.getSelectedPartitions()); if (fileScan.getTableSnapshot().isPresent()) { ((FileQueryScanNode) scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index 6e45fe7ecd92416..2fc6d423c1cc7e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -35,6 +35,8 @@ import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; +import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; +import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; import org.apache.doris.nereids.CTEContext; import org.apache.doris.nereids.CascadesContext; @@ -75,6 +77,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer; import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan; import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan; @@ -428,15 +431,19 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio return new LogicalSubQueryAlias<>(qualifiedTableName, hiveViewPlan); } if (hmsTable.getDlaType() == DLAType.HUDI) { + SelectedPartitions selectedPartitions = HiveMetaStoreClientHelper.ugiDoAs( + HiveMetaStoreClientHelper.getConfiguration(hmsTable), + () -> HudiUtils.getAllPartitions(unboundRelation.getTableSnapshot(), hmsTable)); LogicalHudiScan hudiScan = new LogicalHudiScan(unboundRelation.getRelationId(), hmsTable, qualifierWithoutTableName, unboundRelation.getTableSample(), - unboundRelation.getTableSnapshot()); + unboundRelation.getTableSnapshot(), + selectedPartitions); hudiScan = hudiScan.withScanParams(hmsTable, unboundRelation.getScanParams()); return hudiScan; } else { - return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, + return new LogicalFileScan(unboundRelation.getRelationId(), hmsTable, qualifierWithoutTableName, - ((HMSExternalTable) table).getAllPartitions(), + hmsTable.getAllPartitions(), unboundRelation.getTableSample(), unboundRelation.getTableSnapshot()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index c9f5459b594e87a..fc8bde09ed14e9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -60,16 +60,19 @@ public Rule build() { LogicalFileScan scan = filter.child(); ExternalTable tbl = scan.getTable(); - SelectedPartitions selectedPartitions; + SelectedPartitions selectedPartitions = null; // TODO(cmy): support other external table - if (tbl instanceof HMSExternalTable && ((HMSExternalTable) tbl).getDlaType() == DLAType.HIVE) { + if (tbl instanceof HMSExternalTable) { HMSExternalTable hiveTbl = (HMSExternalTable) tbl; - selectedPartitions = pruneHivePartitions(hiveTbl, filter, scan, ctx.cascadesContext); + if (hiveTbl.getDlaType() == DLAType.HIVE || hiveTbl.getDlaType() == DLAType.HUDI) { + selectedPartitions = pruneHivePartitions(hiveTbl, filter, scan, ctx.cascadesContext); + } } else if (tbl instanceof MaxComputeExternalTable) { MaxComputeExternalTable maxComputeTbl = (MaxComputeExternalTable) tbl; selectedPartitions = pruneMaxComputePartitions(maxComputeTbl, filter, scan, ctx.cascadesContext); - } else { + } + if (selectedPartitions == null) { // set isPruned so that it won't go pass the partition prune again selectedPartitions = new SelectedPartitions(0, ImmutableMap.of(), true); } @@ -79,6 +82,7 @@ public Rule build() { }).toRule(RuleType.FILE_SCAN_PARTITION_PRUNE); } + //prune Hive and Hudi partitions. private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl, LogicalFilter filter, LogicalFileScan scan, CascadesContext ctx) { Map selectedPartitionItems = Maps.newHashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java index 629690889432b37..285819103af8b82 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java @@ -82,9 +82,10 @@ protected LogicalHudiScan(RelationId id, ExternalTable table, List quali } public LogicalHudiScan(RelationId id, ExternalTable table, List qualifier, - Optional tableSample, Optional tableSnapshot) { + Optional tableSample, Optional tableSnapshot, + SelectedPartitions selectedPartitions) { this(id, table, qualifier, Optional.empty(), Optional.empty(), - SelectedPartitions.NOT_PRUNED, tableSample, tableSnapshot, + selectedPartitions, tableSample, tableSnapshot, Optional.empty(), Optional.empty()); }