Skip to content

Commit

Permalink
[fix](maxcompute)add mc catalog read partition table partition prune (a…
Browse files Browse the repository at this point in the history
…pache#44508)

### What problem does this PR solve?

Based on the `requiredPartitions` API interface of the maxcompute API,
we implemented partition pruning when reading partitioned tables to
avoid queries that scan the entire table.
  • Loading branch information
hubgeter committed Dec 3, 2024
1 parent 6e04545 commit 0ed601b
Show file tree
Hide file tree
Showing 11 changed files with 497 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public TableIndexes getTableIndexes() {
* @return
*/
public SelectedPartitions initSelectedPartitions(Optional<MvccSnapshot> snapshot) {
if (!supportPartitionPruned()) {
if (!supportInternalPartitionPruned()) {
return SelectedPartitions.NOT_PRUNED;
}
if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshot))) {
Expand All @@ -394,7 +394,7 @@ public SelectedPartitions initSelectedPartitions(Optional<MvccSnapshot> snapshot
* @param snapshot if not support mvcc, ignore this
* @return partitionName ==> PartitionItem
*/
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
protected Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return Collections.emptyMap();
}

Expand All @@ -410,11 +410,12 @@ public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
}

/**
* Does it support partition cpruned, If so, this method needs to be overridden in subclasses
* Does it support Internal partition pruned, If so, this method needs to be overridden in subclasses
* Internal partition pruned : Implement partition pruning logic without relying on external APIs.
*
* @return
*/
public boolean supportPartitionPruned() {
public boolean supportInternalPartitionPruned() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,12 @@ public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
}

@Override
public boolean supportPartitionPruned() {
public boolean supportInternalPartitionPruned() {
return getDlaType() == DLAType.HIVE;
}

@Override
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
protected Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return getNameToPartitionItems();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class HiveScanNode extends FileQueryScanNode {

// will only be set in Nereids, for lagency planner, it should be null
@Setter
private SelectedPartitions selectedPartitions = null;
protected SelectedPartitions selectedPartitions = null;

private boolean partitionInit = false;
private final AtomicReference<UserException> batchException = new AtomicReference<>(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.thrift.TMCTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
Expand All @@ -50,6 +52,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;

Expand All @@ -71,6 +74,15 @@ protected synchronized void makeSureInitialized() {
}
}

@Override
public boolean supportInternalPartitionPruned() {
return true;
}

@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
return getPartitionColumns();
}

public List<Column> getPartitionColumns() {
makeSureInitialized();
Expand All @@ -79,7 +91,24 @@ public List<Column> getPartitionColumns() {
.orElse(Collections.emptyList());
}

public TablePartitionValues getPartitionValues() {
@Override
protected Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
if (getPartitionColumns().isEmpty()) {
return Collections.emptyMap();
}

TablePartitionValues tablePartitionValues = getPartitionValues();
Map<Long, PartitionItem> idToPartitionItem = tablePartitionValues.getIdToPartitionItem();
Map<Long, String> idToNameMap = tablePartitionValues.getPartitionIdToNameMap();

Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMapWithExpectedSize(idToPartitionItem.size());
for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
nameToPartitionItem.put(idToNameMap.get(entry.getKey()), entry.getValue());
}
return nameToPartitionItem;
}

private TablePartitionValues getPartitionValues() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
Expand Down Expand Up @@ -110,6 +139,8 @@ private TablePartitionValues loadPartitionValues(MaxComputeSchemaCacheValue sche

/**
* parse all values from partitionPath to a single list.
* In MaxCompute : Support special characters : _$#.!@
* Ref : MaxCompute Error Code: ODPS-0130071 Invalid partition value.
*
* @param partitionColumns partitionColumns can contain the part1,part2,part3...
* @param partitionPath partitionPath format is like the 'part1=123/part2=abc/part3=1bc'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType;
import org.apache.doris.datasource.property.constants.MCProperties;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.nereids.util.DateUtils;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.spi.Split;
Expand All @@ -50,6 +51,7 @@
import org.apache.doris.thrift.TTableFormatFileDesc;

import com.aliyun.odps.OdpsType;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.configuration.ArrowOptions;
import com.aliyun.odps.table.configuration.ArrowOptions.TimestampUnit;
Expand All @@ -60,6 +62,7 @@
import com.aliyun.odps.table.read.split.impl.IndexedInputSplit;
import com.google.common.collect.Maps;
import jline.internal.Log;
import lombok.Setter;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -86,14 +89,28 @@ public class MaxComputeScanNode extends FileQueryScanNode {
private static final LocationPath ROW_OFFSET_PATH = new LocationPath("/row_offset", Maps.newHashMap());
private static final LocationPath BYTE_SIZE_PATH = new LocationPath("/byte_size", Maps.newHashMap());

@Setter
private SelectedPartitions selectedPartitions = null;

// For new planner
public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc,
SelectedPartitions selectedPartitions, boolean needCheckColumnPriv) {
this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE,
selectedPartitions, needCheckColumnPriv);
}

// For old planner
public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE, needCheckColumnPriv);
this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE,
SelectedPartitions.NOT_PRUNED, needCheckColumnPriv);
}

public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
StatisticalType statisticalType, boolean needCheckColumnPriv) {
private MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
StatisticalType statisticalType, SelectedPartitions selectedPartitions,
boolean needCheckColumnPriv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
table = (MaxComputeExternalTable) desc.getTable();
this.selectedPartitions = selectedPartitions;
}

@Override
Expand All @@ -117,10 +134,27 @@ private void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeS
rangeDesc.setSize(maxComputeSplit.getLength());
}

void createTableBatchReadSession() throws UserException {
// Return false if no need to read any partition data.
// Return true if need to read partition data.
boolean createTableBatchReadSession() throws UserException {
List<String> requiredPartitionColumns = new ArrayList<>();
List<String> orderedRequiredDataColumns = new ArrayList<>();

List<PartitionSpec> requiredPartitionSpecs = new ArrayList<>();
//if requiredPartitionSpecs is empty, get all partition data.
if (!table.getPartitionColumns().isEmpty() && selectedPartitions != SelectedPartitions.NOT_PRUNED) {
this.totalPartitionNum = selectedPartitions.totalPartitionNum;
this.selectedPartitionNum = selectedPartitions.selectedPartitions.size();

if (selectedPartitions.selectedPartitions.isEmpty()) {
//no need read any partition data.
return false;
}
selectedPartitions.selectedPartitions.forEach(
(key, value) -> requiredPartitionSpecs.add(new PartitionSpec(key))
);
}

Set<String> requiredSlots =
desc.getSlots().stream().map(e -> e.getColumn().getName()).collect(Collectors.toSet());

Expand Down Expand Up @@ -150,6 +184,7 @@ void createTableBatchReadSession() throws UserException {
.withSettings(mcCatalog.getSettings())
.withSplitOptions(mcCatalog.getSplitOption())
.requiredPartitionColumns(requiredPartitionColumns)
.requiredPartitions(requiredPartitionSpecs)
.requiredDataColumns(orderedRequiredDataColumns)
.withArrowOptions(
ArrowOptions.newBuilder()
Expand All @@ -162,7 +197,7 @@ void createTableBatchReadSession() throws UserException {
} catch (java.io.IOException e) {
throw new RuntimeException(e);
}

return true;
}

@Override
Expand Down Expand Up @@ -430,7 +465,10 @@ public List<Split> getSplits() throws UserException {
if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
return result;
}
createTableBatchReadSession();

if (!createTableBatchReadSession()) {
return result;
}

try {
String scanSessionSerialize = serializeSession(tableBatchReadSession);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,8 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla
} else if (table instanceof TrinoConnectorExternalTable) {
scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
} else if (table instanceof MaxComputeExternalTable) {
scanNode = new MaxComputeScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
scanNode = new MaxComputeScanNode(context.nextPlanNodeId(), tupleDescriptor,
fileScan.getSelectedPartitions(), false);
} else if (table instanceof LakeSoulExternalTable) {
scanNode = new LakeSoulScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class PartitionPruner extends DefaultExpressionRewriter<Void> {
/** Different type of table may have different partition prune behavior. */
public enum PartitionTableType {
OLAP,
HIVE
EXTERNAL
}

private PartitionPruner(List<OnePartitionEvaluator> partitions, Expression partitionPredicate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Rule build() {
ExternalTable tbl = scan.getTable();

SelectedPartitions selectedPartitions;
if (tbl.supportPartitionPruned()) {
if (tbl.supportInternalPartitionPruned()) {
selectedPartitions = pruneExternalPartitions(tbl, filter, scan, ctx.cascadesContext);
} else {
// set isPruned so that it won't go pass the partition prune again
Expand Down Expand Up @@ -91,7 +91,7 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable,

Map<String, PartitionItem> nameToPartitionItem = scan.getSelectedPartitions().selectedPartitions;
List<String> prunedPartitions = new ArrayList<>(PartitionPruner.prune(
partitionSlots, filter.getPredicate(), nameToPartitionItem, ctx, PartitionTableType.HIVE));
partitionSlots, filter.getPredicate(), nameToPartitionItem, ctx, PartitionTableType.EXTERNAL));

for (String name : prunedPartitions) {
selectedPartitionItems.put(name, nameToPartitionItem.get(name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TPushAggOp;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -1993,9 +1992,7 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s
scanNode = new TrinoConnectorScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
break;
case MAX_COMPUTE_EXTERNAL_TABLE:
// TODO: support max compute scan node
scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "MCScanNode",
StatisticalType.MAX_COMPUTE_SCAN_NODE, true);
scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
break;
case ES_EXTERNAL_TABLE:
scanNode = new EsScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
Expand Down
Loading

0 comments on commit 0ed601b

Please sign in to comment.