Skip to content

Commit

Permalink
Core: Support DVs in DeleteLoader
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Nov 6, 2024
1 parent 7938403 commit e3d3734
Show file tree
Hide file tree
Showing 2 changed files with 287 additions and 1 deletion.
67 changes: 67 additions & 0 deletions data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,20 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.orc.OrcRowReader;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.CharSequenceMap;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
Expand Down Expand Up @@ -146,6 +151,26 @@ private <T> Iterable<T> materialize(CloseableIterable<T> iterable) {
@Override
public PositionDeleteIndex loadPositionDeletes(
Iterable<DeleteFile> deleteFiles, CharSequence filePath) {
if (containsDVs(deleteFiles)) {
DeleteFile dv = Iterables.getOnlyElement(deleteFiles);
validateDV(dv, filePath);
return readDV(dv); // TODO: support caching entire DV files
} else {
return getOrReadPosDeletes(deleteFiles, filePath);
}
}

private PositionDeleteIndex readDV(DeleteFile dv) {
LOG.trace("Opening DV file {}", dv.location());
InputFile inputFile = loadInputFile.apply(dv);
long offset = dv.contentOffset();
int length = dv.contentSizeInBytes().intValue();
byte[] bytes = readBytes(inputFile, offset, length);
return PositionDeleteIndex.deserialize(bytes, dv);
}

private PositionDeleteIndex getOrReadPosDeletes(
Iterable<DeleteFile> deleteFiles, CharSequence filePath) {
Iterable<PositionDeleteIndex> deletes =
execute(deleteFiles, deleteFile -> getOrReadPosDeletes(deleteFile, filePath));
return PositionDeleteIndexUtil.merge(deletes);
Expand Down Expand Up @@ -259,4 +284,46 @@ private long estimateEqDeletesSize(DeleteFile deleteFile, Schema projection) {
private int estimateRecordSize(Schema schema) {
return schema.columns().stream().mapToInt(TypeUtil::estimateSize).sum();
}

private boolean containsDVs(Iterable<DeleteFile> deleteFiles) {
return Iterables.any(deleteFiles, ContentFileUtil::isDV);
}

private void validateDV(DeleteFile dv, CharSequence filePath) {
Preconditions.checkArgument(
dv.contentOffset() != null,
"Invalid DV, offset cannot be null: %s",
ContentFileUtil.dvDesc(dv));
Preconditions.checkArgument(
dv.contentSizeInBytes() != null,
"Invalid DV, length is null: %s",
ContentFileUtil.dvDesc(dv));
Preconditions.checkArgument(
dv.contentSizeInBytes() <= Integer.MAX_VALUE,
"Can't read DV larger than 2GB: %s",
dv.contentSizeInBytes());
Preconditions.checkArgument(
filePath.toString().equals(dv.referencedDataFile()),
"DV is expected to reference %s, not %s",
filePath,
dv.referencedDataFile());
}

private byte[] readBytes(InputFile inputFile, long offset, int length) {
try (SeekableInputStream stream = inputFile.newStream()) {
byte[] bytes = new byte[length];

if (stream instanceof RangeReadable) {
RangeReadable rangeReadable = (RangeReadable) stream;
rangeReadable.readFully(offset, bytes);
} else {
stream.seek(offset);
ByteStreams.readFully(stream, bytes);
}

return bytes;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
221 changes: 220 additions & 1 deletion data/src/test/java/org/apache/iceberg/io/TestDVWriters.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.io;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -28,17 +29,25 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.BaseDeleteLoader;
import org.apache.iceberg.data.DeleteLoader;
import org.apache.iceberg.deletes.BaseDVFileWriter;
import org.apache.iceberg.deletes.DVFileWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
Expand All @@ -49,10 +58,11 @@ public abstract class TestDVWriters<T> extends WriterTestBase<T> {

@Parameters(name = "formatVersion = {0}")
protected static List<Object> parameters() {
return Arrays.asList(new Object[] {3});
return Arrays.asList(new Object[] {2, 3});
}

private OutputFileFactory fileFactory = null;
private OutputFileFactory parquetFileFactory = null;

protected abstract StructLikeSet toSet(Iterable<T> records);

Expand All @@ -65,10 +75,14 @@ protected FileFormat dataFormat() {
public void setupTable() throws Exception {
this.table = create(SCHEMA, PartitionSpec.unpartitioned());
this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build();
this.parquetFileFactory =
OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PARQUET).build();
}

@TestTemplate
public void testBasicDVs() throws IOException {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);

FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

// add the first data file
Expand Down Expand Up @@ -100,6 +114,211 @@ public void testBasicDVs() throws IOException {
.contains(dataFile1.location())
.contains(dataFile2.location());
assertThat(result.referencesDataFiles()).isTrue();

// commit the deletes
commit(result);

// verify correctness
assertRows(ImmutableList.of(toRow(11, "aaa"), toRow(12, "aaa")));
}

@TestTemplate
public void testRewriteDVs() throws IOException {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);

FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

// add a data file with 3 data records
List<T> rows = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(3, "aaa"));
DataFile dataFile = writeData(writerFactory, parquetFileFactory, rows, table.spec(), null);
table.newFastAppend().appendFile(dataFile).commit();

// write the first DV
DVFileWriter dvWriter1 =
new BaseDVFileWriter(fileFactory, new PreviousDeleteLoader(table, ImmutableMap.of()));
dvWriter1.delete(dataFile.location(), 1L, table.spec(), null);
dvWriter1.close();

// validate the writer result
DeleteWriteResult result1 = dvWriter1.result();
assertThat(result1.deleteFiles()).hasSize(1);
assertThat(result1.referencedDataFiles()).containsOnly(dataFile.location());
assertThat(result1.referencesDataFiles()).isTrue();
assertThat(result1.rewrittenDeleteFiles()).isEmpty();

// commit the first DV
commit(result1);
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).isEmpty();

// verify correctness after committing the first DV
assertRows(ImmutableList.of(toRow(1, "aaa"), toRow(3, "aaa")));

// write the second DV, merging with the first one
DeleteFile dv1 = Iterables.getOnlyElement(result1.deleteFiles());
DVFileWriter dvWriter2 =
new BaseDVFileWriter(
fileFactory,
new PreviousDeleteLoader(table, ImmutableMap.of(dataFile.location(), dv1)));
dvWriter2.delete(dataFile.location(), 2L, table.spec(), null);
dvWriter2.close();

// validate the writer result
DeleteWriteResult result2 = dvWriter2.result();
assertThat(result2.deleteFiles()).hasSize(1);
assertThat(result2.referencedDataFiles()).containsOnly(dataFile.location());
assertThat(result2.referencesDataFiles()).isTrue();
assertThat(result2.rewrittenDeleteFiles()).hasSize(1);

// replace DVs
commit(result2);
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).hasSize(1);

// verify correctness after replacing DVs
assertRows(ImmutableList.of(toRow(1, "aaa")));
}

@TestTemplate
public void testRewriteFileScopedPositionDeletes() throws IOException {
assumeThat(formatVersion).isEqualTo(2);

FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

// add a data file with 3 records
List<T> rows = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(3, "aaa"));
DataFile dataFile = writeData(writerFactory, parquetFileFactory, rows, table.spec(), null);
table.newFastAppend().appendFile(dataFile).commit();

// add a file-scoped position delete file
DeleteFile deleteFile =
writePositionDeletes(writerFactory, ImmutableList.of(Pair.of(dataFile.location(), 0L)));
table.newRowDelta().addDeletes(deleteFile).commit();

// verify correctness after adding the file-scoped position delete
assertRows(ImmutableList.of(toRow(2, "aaa"), toRow(3, "aaa")));

// upgrade the table to V3 to enable DVs
table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit();

// write a DV, merging with the file-scoped position delete
DVFileWriter dvWriter =
new BaseDVFileWriter(
fileFactory,
new PreviousDeleteLoader(table, ImmutableMap.of(dataFile.location(), deleteFile)));
dvWriter.delete(dataFile.location(), 1L, table.spec(), null);
dvWriter.close();

// validate the writer result
DeleteWriteResult result = dvWriter.result();
assertThat(result.deleteFiles()).hasSize(1);
assertThat(result.referencedDataFiles()).containsOnly(dataFile.location());
assertThat(result.referencesDataFiles()).isTrue();
assertThat(result.rewrittenDeleteFiles()).hasSize(1);

// replace the position delete file with the DV
commit(result);
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).hasSize(1);

// verify correctness
assertRows(ImmutableList.of(toRow(3, "aaa")));
}

@TestTemplate
public void testApplyPartitionScopedPositionDeletes() throws IOException {
assumeThat(formatVersion).isEqualTo(2);

FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

// add the first data file with 3 records
List<T> rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(3, "aaa"));
DataFile dataFile1 = writeData(writerFactory, parquetFileFactory, rows1, table.spec(), null);
table.newFastAppend().appendFile(dataFile1).commit();

// add the second data file with 3 records
List<T> rows2 = ImmutableList.of(toRow(4, "aaa"), toRow(5, "aaa"), toRow(6, "aaa"));
DataFile dataFile2 = writeData(writerFactory, parquetFileFactory, rows2, table.spec(), null);
table.newFastAppend().appendFile(dataFile2).commit();

// add a position delete file with deletes for both data files
DeleteFile deleteFile =
writePositionDeletes(
writerFactory,
ImmutableList.of(
Pair.of(dataFile1.location(), 0L),
Pair.of(dataFile1.location(), 1L),
Pair.of(dataFile2.location(), 0L)));
table.newRowDelta().addDeletes(deleteFile).commit();

// verify correctness with the position delete file
assertRows(ImmutableList.of(toRow(3, "aaa"), toRow(5, "aaa"), toRow(6, "aaa")));

// upgrade the table to V3 to enable DVs
table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit();

// write a DV, applying old positions but keeping the position delete file in place
DVFileWriter dvWriter =
new BaseDVFileWriter(
fileFactory,
new PreviousDeleteLoader(table, ImmutableMap.of(dataFile2.location(), deleteFile)));
dvWriter.delete(dataFile2.location(), 1L, table.spec(), null);
dvWriter.close();

// validate the writer result
DeleteWriteResult result = dvWriter.result();
assertThat(result.deleteFiles()).hasSize(1);
assertThat(result.referencedDataFiles()).containsOnly(dataFile2.location());
assertThat(result.referencesDataFiles()).isTrue();
assertThat(result.rewrittenDeleteFiles()).isEmpty();
DeleteFile dv = Iterables.getOnlyElement(result.deleteFiles());

// commit the DV, ensuring the position delete file remains
commit(result);
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).isEmpty();

// verify correctness with DVs and position delete files
assertRows(ImmutableList.of(toRow(3, "aaa"), toRow(6, "aaa")));

// verify the position delete file applies only to the data file without the DV
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
for (FileScanTask task : tasks) {
DeleteFile taskDeleteFile = Iterables.getOnlyElement(task.deletes());
if (task.file().location().equals(dataFile1.location())) {
assertThat(taskDeleteFile.location()).isEqualTo(deleteFile.location());
} else {
assertThat(taskDeleteFile.location()).isEqualTo(dv.location());
}
}
}
}

private void commit(DeleteWriteResult result) {
RowDelta rowDelta = table.newRowDelta();
result.rewrittenDeleteFiles().forEach(rowDelta::removeDeletes);
result.deleteFiles().forEach(rowDelta::addDeletes);
rowDelta.commit();
}

private void assertRows(Iterable<T> expectedRows) throws IOException {
assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows));
}

private DeleteFile writePositionDeletes(
FileWriterFactory<T> writerFactory, List<Pair<String, Long>> deletes) throws IOException {
EncryptedOutputFile file = parquetFileFactory.newOutputFile(table.spec(), null);
PositionDeleteWriter<T> writer =
writerFactory.newPositionDeleteWriter(file, table.spec(), null);
PositionDelete<T> posDelete = PositionDelete.create();

try (PositionDeleteWriter<T> closableWriter = writer) {
for (Pair<String, Long> delete : deletes) {
closableWriter.write(posDelete.set(delete.first(), delete.second()));
}
}

return writer.toDeleteFile();
}

private static class PreviousDeleteLoader implements Function<String, PositionDeleteIndex> {
Expand Down

0 comments on commit e3d3734

Please sign in to comment.