Skip to content

Commit

Permalink
[enhance](catalog)External partition prune return partitionName inste…
Browse files Browse the repository at this point in the history
…ad of partitionId (#44415)

The partition ID of external data sources is meaningless, and some data
sources only have partition names, so the return result of partition
pruning is replaced with name instead of ID
  • Loading branch information
zddr authored and Your Name committed Nov 29, 2024
1 parent 82c7a9d commit d8af9e5
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,13 @@ public SelectedPartitions getAllPartitions() {
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
this.getDbName(), this.getName(), partitionColumnTypes);
Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem();

return new SelectedPartitions(idToPartitionItem.size(), idToPartitionItem, false);
// transfer id to name
BiMap<Long, String> idToName = hivePartitionValues.getPartitionNameToIdMap().inverse();
Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMapWithExpectedSize(idToPartitionItem.size());
for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
nameToPartitionItem.put(idToName.get(entry.getKey()), entry.getValue());
}
return new SelectedPartitions(idToPartitionItem.size(), nameToPartitionItem, false);
}

public boolean isHiveTransactionalTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,25 @@
import java.util.stream.IntStream;

/** OneListPartitionInputs */
public class OneListPartitionEvaluator
extends DefaultExpressionRewriter<Map<Slot, PartitionSlotInput>> implements OnePartitionEvaluator {
private final long partitionId;
public class OneListPartitionEvaluator<K>
extends DefaultExpressionRewriter<Map<Slot, PartitionSlotInput>> implements OnePartitionEvaluator<K> {
private final K partitionIdent;
private final List<Slot> partitionSlots;
private final ListPartitionItem partitionItem;
private final ExpressionRewriteContext expressionRewriteContext;

public OneListPartitionEvaluator(long partitionId, List<Slot> partitionSlots,
public OneListPartitionEvaluator(K partitionIdent, List<Slot> partitionSlots,
ListPartitionItem partitionItem, CascadesContext cascadesContext) {
this.partitionId = partitionId;
this.partitionIdent = partitionIdent;
this.partitionSlots = Objects.requireNonNull(partitionSlots, "partitionSlots cannot be null");
this.partitionItem = Objects.requireNonNull(partitionItem, "partitionItem cannot be null");
this.expressionRewriteContext = new ExpressionRewriteContext(
Objects.requireNonNull(cascadesContext, "cascadesContext cannot be null"));
}

@Override
public long getPartitionId() {
return partitionId;
public K getPartitionIdent() {
return partitionIdent;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import java.util.Map;

/** the evaluator of the partition which represent one partition */
public interface OnePartitionEvaluator {
long getPartitionId();
public interface OnePartitionEvaluator<K> {
K getPartitionIdent();

/**
* return a slot to expression mapping to replace the input.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@
*
* you can see the process steps in the comment of PartitionSlotInput.columnRanges
*/
public class OneRangePartitionEvaluator
public class OneRangePartitionEvaluator<K>
extends ExpressionVisitor<EvaluateRangeResult, EvaluateRangeInput>
implements OnePartitionEvaluator {
private final long partitionId;
implements OnePartitionEvaluator<K> {
private final K partitionIdent;
private final List<Slot> partitionSlots;
private final RangePartitionItem partitionItem;
private final ExpressionRewriteContext expressionRewriteContext;
Expand All @@ -95,9 +95,9 @@ public class OneRangePartitionEvaluator
private final Map<Slot, PartitionSlotType> slotToType;

/** OneRangePartitionEvaluator */
public OneRangePartitionEvaluator(long partitionId, List<Slot> partitionSlots,
public OneRangePartitionEvaluator(K partitionIdent, List<Slot> partitionSlots,
RangePartitionItem partitionItem, CascadesContext cascadesContext, int expandThreshold) {
this.partitionId = partitionId;
this.partitionIdent = partitionIdent;
this.partitionSlots = Objects.requireNonNull(partitionSlots, "partitionSlots cannot be null");
this.partitionItem = Objects.requireNonNull(partitionItem, "partitionItem cannot be null");
this.expressionRewriteContext = new ExpressionRewriteContext(
Expand Down Expand Up @@ -155,8 +155,8 @@ public OneRangePartitionEvaluator(long partitionId, List<Slot> partitionSlots,
}

@Override
public long getPartitionId() {
return partitionId;
public K getPartitionIdent() {
return partitionIdent;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,21 @@ public Expression visitComparisonPredicate(ComparisonPredicate cp, Void context)
}

/** prune */
public List<Long> prune() {
Builder<Long> scanPartitionIds = ImmutableList.builder();
public <K> List<K> prune() {
Builder<K> scanPartitionIdents = ImmutableList.builder();
for (OnePartitionEvaluator partition : partitions) {
if (!canBePrunedOut(partition)) {
scanPartitionIds.add(partition.getPartitionId());
scanPartitionIdents.add((K) partition.getPartitionIdent());
}
}
return scanPartitionIds.build();
return scanPartitionIdents.build();
}

/**
* prune partition with `idToPartitions` as parameter.
*/
public static List<Long> prune(List<Slot> partitionSlots, Expression partitionPredicate,
Map<Long, PartitionItem> idToPartitions, CascadesContext cascadesContext,
public static <K> List<K> prune(List<Slot> partitionSlots, Expression partitionPredicate,
Map<K, PartitionItem> idToPartitions, CascadesContext cascadesContext,
PartitionTableType partitionTableType) {
partitionPredicate = PartitionPruneExpressionExtractor.extract(
partitionPredicate, ImmutableSet.copyOf(partitionSlots), cascadesContext);
Expand All @@ -135,7 +135,7 @@ public static List<Long> prune(List<Slot> partitionSlots, Expression partitionPr
}

List<OnePartitionEvaluator> evaluators = Lists.newArrayListWithCapacity(idToPartitions.size());
for (Entry<Long, PartitionItem> kv : idToPartitions.entrySet()) {
for (Entry<K, PartitionItem> kv : idToPartitions.entrySet()) {
evaluators.add(toPartitionEvaluator(
kv.getKey(), kv.getValue(), partitionSlots, cascadesContext, expandThreshold));
}
Expand All @@ -147,7 +147,7 @@ public static List<Long> prune(List<Slot> partitionSlots, Expression partitionPr
/**
* convert partition item to partition evaluator
*/
public static final OnePartitionEvaluator toPartitionEvaluator(long id, PartitionItem partitionItem,
public static final <K> OnePartitionEvaluator<K> toPartitionEvaluator(K id, PartitionItem partitionItem,
List<Slot> partitionSlots, CascadesContext cascadesContext, int expandThreshold) {
if (partitionItem instanceof ListPartitionItem) {
return new OneListPartitionEvaluator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@
import java.util.Map;

/** UnknownPartitionEvaluator */
public class UnknownPartitionEvaluator implements OnePartitionEvaluator {
private final long partitionId;
public class UnknownPartitionEvaluator<K> implements OnePartitionEvaluator<K> {
private final K partitionIdent;
private final PartitionItem partitionItem;

public UnknownPartitionEvaluator(long partitionId, PartitionItem partitionItem) {
this.partitionId = partitionId;
public UnknownPartitionEvaluator(K partitionId, PartitionItem partitionItem) {
this.partitionIdent = partitionId;
this.partitionItem = partitionItem;
}

@Override
public long getPartitionId() {
return partitionId;
public K getPartitionIdent() {
return partitionIdent;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Rule build() {

private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl,
LogicalFilter<LogicalFileScan> filter, LogicalFileScan scan, CascadesContext ctx) {
Map<Long, PartitionItem> selectedPartitionItems = Maps.newHashMap();
Map<String, PartitionItem> selectedPartitionItems = Maps.newHashMap();
if (CollectionUtils.isEmpty(hiveTbl.getPartitionColumns())) {
// non partitioned table, return NOT_PRUNED.
// non partition table will be handled in HiveScanNode.
Expand All @@ -91,13 +91,13 @@ private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl,
.map(column -> scanOutput.get(column.getName().toLowerCase()))
.collect(Collectors.toList());

Map<Long, PartitionItem> idToPartitionItem = scan.getSelectedPartitions().selectedPartitions;
List<Long> prunedPartitions = new ArrayList<>(PartitionPruner.prune(
partitionSlots, filter.getPredicate(), idToPartitionItem, ctx, PartitionTableType.HIVE));
Map<String, PartitionItem> nameToPartitionItem = scan.getSelectedPartitions().selectedPartitions;
List<String> prunedPartitions = new ArrayList<>(PartitionPruner.prune(
partitionSlots, filter.getPredicate(), nameToPartitionItem, ctx, PartitionTableType.HIVE));

for (Long id : prunedPartitions) {
selectedPartitionItems.put(id, idToPartitionItem.get(id));
for (String name : prunedPartitions) {
selectedPartitionItems.put(name, nameToPartitionItem.get(name));
}
return new SelectedPartitions(idToPartitionItem.size(), selectedPartitionItems, true);
return new SelectedPartitions(nameToPartitionItem.size(), selectedPartitionItems, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ public static class SelectedPartitions {
*/
public final long totalPartitionNum;
/**
* partition id -> partition item
* partition name -> partition item
*/
public final Map<Long, PartitionItem> selectedPartitions;
public final Map<String, PartitionItem> selectedPartitions;
/**
* true means the result is after partition pruning
* false means the partition pruning is not processed.
Expand All @@ -159,7 +159,7 @@ public static class SelectedPartitions {
/**
* Constructor for SelectedPartitions.
*/
public SelectedPartitions(long totalPartitionNum, Map<Long, PartitionItem> selectedPartitions,
public SelectedPartitions(long totalPartitionNum, Map<String, PartitionItem> selectedPartitions,
boolean isPruned) {
this.totalPartitionNum = totalPartitionNum;
this.selectedPartitions = ImmutableMap.copyOf(Objects.requireNonNull(selectedPartitions,
Expand Down

0 comments on commit d8af9e5

Please sign in to comment.