Skip to content

Commit

Permalink
try a differnt implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao committed Nov 2, 2024
1 parent fda2b3a commit 5f7fed8
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 56 deletions.
30 changes: 30 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,36 @@ acceptedBreaks:
new: "method org.apache.iceberg.BaseMetastoreOperations.CommitStatus org.apache.iceberg.BaseMetastoreTableOperations::checkCommitStatus(java.lang.String,\
\ org.apache.iceberg.TableMetadata)"
justification: "Removing deprecated code"
org.apache.iceberg:iceberg-data:
- code: "java.method.numberOfParametersChanged"
old: "method void org.apache.iceberg.data.DeleteFilter<T>::<init>(java.lang.String,\
\ java.util.List<org.apache.iceberg.DeleteFile>, org.apache.iceberg.Schema,\
\ org.apache.iceberg.Schema, org.apache.iceberg.deletes.DeleteCounter)"
new: "method void org.apache.iceberg.data.DeleteFilter<T>::<init>(java.lang.String,\
\ java.util.List<org.apache.iceberg.DeleteFile>, org.apache.iceberg.Schema,\
\ org.apache.iceberg.Schema, org.apache.iceberg.deletes.DeleteCounter, boolean)"
justification: "add a param to tell if the pos column needs to ve included"
org.apache.iceberg:iceberg-parquet:
- code: "java.method.numberOfParametersChanged"
old: "method void org.apache.iceberg.parquet.ParquetReader<T>::<init>(org.apache.iceberg.io.InputFile,\
\ org.apache.iceberg.Schema, org.apache.parquet.ParquetReadOptions, java.util.function.Function<org.apache.parquet.schema.MessageType,\
\ org.apache.iceberg.parquet.ParquetValueReader<?>>, org.apache.iceberg.mapping.NameMapping,\
\ org.apache.iceberg.expressions.Expression, boolean, boolean)"
new: "method void org.apache.iceberg.parquet.ParquetReader<T>::<init>(org.apache.iceberg.io.InputFile,\
\ org.apache.iceberg.Schema, org.apache.parquet.ParquetReadOptions, java.util.function.Function<org.apache.parquet.schema.MessageType,\
\ org.apache.iceberg.parquet.ParquetValueReader<?>>, org.apache.iceberg.mapping.NameMapping,\
\ org.apache.iceberg.expressions.Expression, boolean, boolean, boolean)"
justification: "add a param to pass pos delete info"
- code: "java.method.numberOfParametersChanged"
old: "method void org.apache.iceberg.parquet.VectorizedParquetReader<T>::<init>(org.apache.iceberg.io.InputFile,\
\ org.apache.iceberg.Schema, org.apache.parquet.ParquetReadOptions, java.util.function.Function<org.apache.parquet.schema.MessageType,\
\ org.apache.iceberg.parquet.VectorizedReader<?>>, org.apache.iceberg.mapping.NameMapping,\
\ org.apache.iceberg.expressions.Expression, boolean, boolean, int)"
new: "method void org.apache.iceberg.parquet.VectorizedParquetReader<T>::<init>(org.apache.iceberg.io.InputFile,\
\ org.apache.iceberg.Schema, org.apache.parquet.ParquetReadOptions, java.util.function.Function<org.apache.parquet.schema.MessageType,\
\ org.apache.iceberg.parquet.VectorizedReader<?>>, org.apache.iceberg.mapping.NameMapping,\
\ org.apache.iceberg.expressions.Expression, boolean, boolean, int, boolean)"
justification: "add a param to pass pos delete info"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
13 changes: 8 additions & 5 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ protected DeleteFilter(
List<DeleteFile> deletes,
Schema tableSchema,
Schema requestedSchema,
DeleteCounter counter) {
DeleteCounter counter,
boolean isBatchReading) {
this.filePath = filePath;
this.counter = counter;

Expand All @@ -93,7 +94,8 @@ protected DeleteFilter(

this.posDeletes = posDeleteBuilder.build();
this.eqDeletes = eqDeleteBuilder.build();
this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes);
this.requiredSchema =
fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes, isBatchReading);
this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
this.hasIsDeletedColumn =
requiredSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null;
Expand All @@ -102,7 +104,7 @@ protected DeleteFilter(

protected DeleteFilter(
String filePath, List<DeleteFile> deletes, Schema tableSchema, Schema requestedSchema) {
this(filePath, deletes, tableSchema, requestedSchema, new DeleteCounter());
this(filePath, deletes, tableSchema, requestedSchema, new DeleteCounter(), false);
}

protected int columnIsDeletedPosition() {
Expand Down Expand Up @@ -251,13 +253,14 @@ private static Schema fileProjection(
Schema tableSchema,
Schema requestedSchema,
List<DeleteFile> posDeletes,
List<DeleteFile> eqDeletes) {
List<DeleteFile> eqDeletes,
boolean isBatchReading) {
if (posDeletes.isEmpty() && eqDeletes.isEmpty()) {
return requestedSchema;
}

Set<Integer> requiredIds = Sets.newLinkedHashSet();
if (!posDeletes.isEmpty()) {
if (!posDeletes.isEmpty() && !isBatchReading) {
requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
}

Expand Down
19 changes: 17 additions & 2 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,7 @@ public static class ReadBuilder {
private NameMapping nameMapping = null;
private ByteBuffer fileEncryptionKey = null;
private ByteBuffer fileAADPrefix = null;
private boolean hasPositionDelete = false;

private ReadBuilder(InputFile file) {
this.file = file;
Expand Down Expand Up @@ -1155,6 +1156,11 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) {
return this;
}

public ReadBuilder hasPositionDelete(boolean positionDelete) {
this.hasPositionDelete = positionDelete;
return this;
}

@SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
public <D> CloseableIterable<D> build() {
FileDecryptionProperties fileDecryptionProperties = null;
Expand Down Expand Up @@ -1216,10 +1222,19 @@ public <D> CloseableIterable<D> build() {
filter,
reuseContainers,
caseSensitive,
maxRecordsPerBatch);
maxRecordsPerBatch,
hasPositionDelete);
} else {
return new org.apache.iceberg.parquet.ParquetReader<>(
file, schema, options, readerFunc, mapping, filter, reuseContainers, caseSensitive);
file,
schema,
options,
readerFunc,
mapping,
filter,
reuseContainers,
caseSensitive,
hasPositionDelete);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
private final boolean reuseContainers;
private final boolean caseSensitive;
private final NameMapping nameMapping;
private final boolean hasPositionDelete;

public ParquetReader(
InputFile input,
Expand All @@ -52,7 +53,8 @@ public ParquetReader(
NameMapping nameMapping,
Expression filter,
boolean reuseContainers,
boolean caseSensitive) {
boolean caseSensitive,
boolean hasPositionDelete) {
this.input = input;
this.expectedSchema = expectedSchema;
this.options = options;
Expand All @@ -62,6 +64,7 @@ public ParquetReader(
this.reuseContainers = reuseContainers;
this.caseSensitive = caseSensitive;
this.nameMapping = nameMapping;
this.hasPositionDelete = hasPositionDelete;
}

private ReadConf<T> conf = null;
Expand All @@ -79,7 +82,8 @@ private ReadConf<T> init() {
nameMapping,
reuseContainers,
caseSensitive,
null);
null,
hasPositionDelete);
this.conf = readConf.copy();
return readConf;
}
Expand Down
13 changes: 8 additions & 5 deletions parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
Expand Down Expand Up @@ -60,6 +59,7 @@ class ReadConf<T> {
private final boolean reuseContainers;
private final Integer batchSize;
private final long[] startRowPositions;
private final boolean hasPositionDelete;

// List of column chunk metadata for each row group
private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetaDataForRowGroups;
Expand All @@ -75,7 +75,8 @@ class ReadConf<T> {
NameMapping nameMapping,
boolean reuseContainers,
boolean caseSensitive,
Integer bSize) {
Integer bSize,
boolean positionDelete) {
this.file = file;
this.options = options;
this.reader = newReader(file, options);
Expand All @@ -96,9 +97,10 @@ class ReadConf<T> {
this.rowGroups = reader.getRowGroups();
this.shouldSkip = new boolean[rowGroups.size()];
this.startRowPositions = new long[rowGroups.size()];
this.hasPositionDelete = positionDelete;

// Fetch all row groups starting positions to compute the row offsets of the filtered row groups
Map<Long, Long> offsetToStartPos = generateOffsetToStartPos(expectedSchema);
Map<Long, Long> offsetToStartPos = generateOffsetToStartPos();

ParquetMetricsRowGroupFilter statsFilter = null;
ParquetDictionaryRowGroupFilter dictFilter = null;
Expand Down Expand Up @@ -156,6 +158,7 @@ private ReadConf(ReadConf<T> toCopy) {
this.vectorizedModel = toCopy.vectorizedModel;
this.columnChunkMetaDataForRowGroups = toCopy.columnChunkMetaDataForRowGroups;
this.startRowPositions = toCopy.startRowPositions;
this.hasPositionDelete = toCopy.hasPositionDelete;
}

ParquetFileReader reader() {
Expand All @@ -181,8 +184,8 @@ boolean[] shouldSkip() {
return shouldSkip;
}

private Map<Long, Long> generateOffsetToStartPos(Schema schema) {
if (schema.findField(MetadataColumns.ROW_POSITION.fieldId()) == null) {
private Map<Long, Long> generateOffsetToStartPos() {
if (hasPositionDelete) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
private final boolean caseSensitive;
private final int batchSize;
private final NameMapping nameMapping;
private final boolean hasPositionDelete;

public VectorizedParquetReader(
InputFile input,
Expand All @@ -59,7 +60,8 @@ public VectorizedParquetReader(
Expression filter,
boolean reuseContainers,
boolean caseSensitive,
int maxRecordsPerBatch) {
int maxRecordsPerBatch,
boolean hasPositionDelete) {
this.input = input;
this.expectedSchema = expectedSchema;
this.options = options;
Expand All @@ -70,6 +72,7 @@ public VectorizedParquetReader(
this.caseSensitive = caseSensitive;
this.batchSize = maxRecordsPerBatch;
this.nameMapping = nameMapping;
this.hasPositionDelete = hasPositionDelete;
}

private ReadConf conf = null;
Expand All @@ -87,7 +90,8 @@ private ReadConf init() {
nameMapping,
reuseContainers,
caseSensitive,
batchSize);
batchSize,
hasPositionDelete);
this.conf = readConf.copy();
return readConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
*/
package org.apache.iceberg.spark.source;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.ScanTask;
Expand All @@ -37,7 +35,6 @@
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.vectorized.ColumnarBatch;

abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {
Expand Down Expand Up @@ -84,15 +81,15 @@ private CloseableIterable<ColumnarBatch> newParquetIterable(
SparkDeleteFilter deleteFilter) {
// get required schema if there are deletes
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema();
Schema vectorizationSchema = vectorizationSchema(deleteFilter);
boolean hasPositionDelete = deleteFilter != null ? deleteFilter.hasPosDeletes() : false;

return Parquet.read(inputFile)
.project(requiredSchema)
.split(start, length)
.createBatchedReaderFunc(
fileSchema ->
VectorizedSparkParquetReaders.buildReader(
vectorizationSchema, fileSchema, idToConstant, deleteFilter))
requiredSchema, fileSchema, idToConstant, deleteFilter))
.recordsPerBatch(batchSize)
.filter(residual)
.caseSensitive(caseSensitive())
Expand All @@ -101,6 +98,7 @@ private CloseableIterable<ColumnarBatch> newParquetIterable(
// read performance as every batch read doesn't have to pay the cost of allocating memory.
.reuseContainers()
.withNameMapping(nameMapping())
.hasPositionDelete(hasPositionDelete)
.build();
}

Expand Down Expand Up @@ -129,29 +127,4 @@ private CloseableIterable<ColumnarBatch> newOrcIterable(
.withNameMapping(nameMapping())
.build();
}

private Schema vectorizationSchema(SparkDeleteFilter deleteFilter) {
// For pos delete, deleteFilter has appended _pos to the required schema.
// For example, SELECT id, data FROM test, the requested schema is id and data. If there
// is position delete, deleteFilter will append _pos to the schema so the schema becomes
// id, data and _pos. However, vectorization reader only needs to read the requested columns,
// i.e. id and data, so we want to remove the _pos from the schema when building the
// vectorization reader. Before removing _pos, we need to make sure _pos is not explicitly
// selected in the query.
if (deleteFilter != null) {
if (deleteFilter.hasPosDeletes()
&& expectedSchema().findType(MetadataColumns.ROW_POSITION.name()) == null) {
List<String> columnNameWithoutPos =
deleteFilter.requiredSchema().columns().stream()
.map(Types.NestedField::name)
.filter(name -> !name.equals(MetadataColumns.ROW_POSITION.name()))
.collect(Collectors.toList());
return deleteFilter.requiredSchema().select(columnNameWithoutPos);
} else {
return deleteFilter.requiredSchema();
}
} else {
return expectedSchema();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@ protected static Object convertConstant(Type type, Object value) {
protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
private final InternalRowWrapper asStructLike;

SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter counter) {
super(filePath, deletes, tableSchema, expectedSchema, counter);
SparkDeleteFilter(
String filePath, List<DeleteFile> deletes, DeleteCounter counter, boolean isBatchReading) {
super(filePath, deletes, tableSchema, expectedSchema, counter, isBatchReading);
this.asStructLike =
new InternalRowWrapper(
SparkSchemaUtil.convert(requiredSchema()), requiredSchema().asStruct());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ private CloseableIterable<InternalRow> newParquetIterable(
.filter(residual)
.caseSensitive(caseSensitive())
.withNameMapping(nameMapping())
.hasPositionDelete(readSchema.findField(MetadataColumns.ROW_POSITION.fieldId()) == null)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ protected CloseableIterator<ColumnarBatch> open(FileScanTask task) {
InputFile inputFile = getInputFile(filePath);
Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask");

SparkDeleteFilter deleteFilter =
task.deletes().isEmpty()
? null
: new SparkDeleteFilter(filePath, task.deletes(), counter());
SparkDeleteFilter sparkDeleteFilter =
new SparkDeleteFilter(filePath, task.deletes(), counter(), true);

SparkDeleteFilter deleteFilter = task.deletes().isEmpty() ? null : sparkDeleteFilter;

return newBatchIterable(
inputFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ private CloseableIterable<InternalRow> openChangelogScanTask(ChangelogScanTask t

CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task) {
String filePath = task.file().path().toString();
SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter());
SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes(), counter(), false);
return deletes.filter(rows(task, deletes.requiredSchema()));
}

private CloseableIterable<InternalRow> openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
String filePath = task.file().path().toString();
SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes(), counter());
SparkDeleteFilter deletes =
new SparkDeleteFilter(filePath, task.existingDeletes(), counter(), false);
return deletes.filter(rows(task, deletes.requiredSchema()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public EqualityDeleteRowReader(
@Override
protected CloseableIterator<InternalRow> open(FileScanTask task) {
SparkDeleteFilter matches =
new SparkDeleteFilter(task.file().path().toString(), task.deletes(), counter());
new SparkDeleteFilter(task.file().path().toString(), task.deletes(), counter(), false);

// schema or rows returned by readers
Schema requiredSchema = matches.requiredSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ protected Stream<ContentFile<?>> referencedFiles(FileScanTask task) {
protected CloseableIterator<InternalRow> open(FileScanTask task) {
String filePath = task.file().path().toString();
LOG.debug("Opening data file {}", filePath);
SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, task.deletes(), counter());
SparkDeleteFilter deleteFilter =
new SparkDeleteFilter(filePath, task.deletes(), counter(), false);

// schema or rows returned by readers
Schema requiredSchema = deleteFilter.requiredSchema();
Expand Down

0 comments on commit 5f7fed8

Please sign in to comment.