diff --git a/api/src/main/java/org/apache/iceberg/AddedRowsScanTask.java b/api/src/main/java/org/apache/iceberg/AddedRowsScanTask.java index d48b268287c3..b67beea7816f 100644 --- a/api/src/main/java/org/apache/iceberg/AddedRowsScanTask.java +++ b/api/src/main/java/org/apache/iceberg/AddedRowsScanTask.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import java.util.List; +import org.apache.iceberg.util.ContentFileUtil; /** * A scan task for inserts generated by adding a data file to the table. @@ -55,7 +56,7 @@ default ChangelogOperation operation() { @Override default long sizeBytes() { - return length() + deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(); + return length() + ContentFileUtil.contentSizeInBytes(deletes()); } @Override diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 3c6d77f34d8f..ea6262afac85 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -104,12 +104,21 @@ public interface DataFile extends ContentFile { "referenced_data_file", StringType.get(), "Fully qualified location (URI with FS scheme) of a data file that all deletes reference"); + Types.NestedField CONTENT_OFFSET = + optional( + 144, "content_offset", LongType.get(), "The offset in the file where the content starts"); + Types.NestedField CONTENT_SIZE = + optional( + 145, + "content_size_in_bytes", + LongType.get(), + "The length of referenced content stored in the file"); int PARTITION_ID = 102; String PARTITION_NAME = "partition"; String PARTITION_DOC = "Partition data tuple, schema based on the partition spec"; - // NEXT ID TO ASSIGN: 144 + // NEXT ID TO ASSIGN: 146 static StructType getType(StructType partitionType) { // IDs start at 100 to leave room for changes to ManifestEntry @@ -131,7 +140,9 @@ static StructType getType(StructType partitionType) { SPLIT_OFFSETS, EQUALITY_IDS, SORT_ORDER_ID, - REFERENCED_DATA_FILE); + REFERENCED_DATA_FILE, + CONTENT_OFFSET, + CONTENT_SIZE); } /** diff --git a/api/src/main/java/org/apache/iceberg/DeleteFile.java b/api/src/main/java/org/apache/iceberg/DeleteFile.java index 8e17e60fcccf..340a00e36b17 100644 --- a/api/src/main/java/org/apache/iceberg/DeleteFile.java +++ b/api/src/main/java/org/apache/iceberg/DeleteFile.java @@ -42,4 +42,26 @@ default List splitOffsets() { default String referencedDataFile() { return null; } + + /** + * Returns the offset in the file where the content starts. + * + *

The content offset is required for deletion vectors and points to the start of the deletion + * vector blob in the Puffin file, enabling direct access. This method always returns null for + * equality and position delete files. + */ + default Long contentOffset() { + return null; + } + + /** + * Returns the length of referenced content stored in the file. + * + *

The content size is required for deletion vectors and indicates the size of the deletion + * vector blob in the Puffin file, enabling direct access. This method always returns null for + * equality and position delete files. + */ + default Long contentSizeInBytes() { + return null; + } } diff --git a/api/src/main/java/org/apache/iceberg/DeletedDataFileScanTask.java b/api/src/main/java/org/apache/iceberg/DeletedDataFileScanTask.java index 9edd6afd0cea..4e1fc8c44379 100644 --- a/api/src/main/java/org/apache/iceberg/DeletedDataFileScanTask.java +++ b/api/src/main/java/org/apache/iceberg/DeletedDataFileScanTask.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import java.util.List; +import org.apache.iceberg.util.ContentFileUtil; /** * A scan task for deletes generated by removing a data file from the table. @@ -54,7 +55,7 @@ default ChangelogOperation operation() { @Override default long sizeBytes() { - return length() + existingDeletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(); + return length() + ContentFileUtil.contentSizeInBytes(existingDeletes()); } @Override diff --git a/api/src/main/java/org/apache/iceberg/DeletedRowsScanTask.java b/api/src/main/java/org/apache/iceberg/DeletedRowsScanTask.java index 131edfddd349..ff176a90a06f 100644 --- a/api/src/main/java/org/apache/iceberg/DeletedRowsScanTask.java +++ b/api/src/main/java/org/apache/iceberg/DeletedRowsScanTask.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import java.util.List; +import org.apache.iceberg.util.ContentFileUtil; /** * A scan task for deletes generated by adding delete files to the table. @@ -63,9 +64,9 @@ default ChangelogOperation operation() { @Override default long sizeBytes() { - return length() - + addedDeletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum() - + existingDeletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(); + long addedDeletesSize = ContentFileUtil.contentSizeInBytes(addedDeletes()); + long existingDeletesSize = ContentFileUtil.contentSizeInBytes(existingDeletes()); + return length() + addedDeletesSize + existingDeletesSize; } @Override diff --git a/api/src/main/java/org/apache/iceberg/FileFormat.java b/api/src/main/java/org/apache/iceberg/FileFormat.java index d662437d5ddb..6b41aec42c3e 100644 --- a/api/src/main/java/org/apache/iceberg/FileFormat.java +++ b/api/src/main/java/org/apache/iceberg/FileFormat.java @@ -24,6 +24,7 @@ /** Enum of supported file formats. */ public enum FileFormat { + PUFFIN("puffin", false), ORC("orc", true), PARQUET("parquet", true), AVRO("avro", true), diff --git a/api/src/main/java/org/apache/iceberg/FileScanTask.java b/api/src/main/java/org/apache/iceberg/FileScanTask.java index 5fb4b55459e3..97ea3d10d72e 100644 --- a/api/src/main/java/org/apache/iceberg/FileScanTask.java +++ b/api/src/main/java/org/apache/iceberg/FileScanTask.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import java.util.List; +import org.apache.iceberg.util.ContentFileUtil; /** A scan task over a range of bytes in a single data file. */ public interface FileScanTask extends ContentScanTask, SplittableScanTask { @@ -36,7 +37,7 @@ default Schema schema() { @Override default long sizeBytes() { - return length() + deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(); + return length() + ContentFileUtil.contentSizeInBytes(deletes()); } @Override diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/api/src/main/java/org/apache/iceberg/MetadataColumns.java similarity index 100% rename from core/src/main/java/org/apache/iceberg/MetadataColumns.java rename to api/src/main/java/org/apache/iceberg/MetadataColumns.java diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/api/src/main/java/org/apache/iceberg/Partitioning.java similarity index 100% rename from core/src/main/java/org/apache/iceberg/Partitioning.java rename to api/src/main/java/org/apache/iceberg/Partitioning.java diff --git a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java b/api/src/main/java/org/apache/iceberg/util/ContentFileUtil.java similarity index 81% rename from core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java rename to api/src/main/java/org/apache/iceberg/util/ContentFileUtil.java index c82b3ff828cf..bc79373ba707 100644 --- a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java +++ b/api/src/main/java/org/apache/iceberg/util/ContentFileUtil.java @@ -24,6 +24,7 @@ import org.apache.iceberg.ContentFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; @@ -84,4 +85,25 @@ public static String referencedDataFileLocation(DeleteFile deleteFile) { CharSequence location = referencedDataFile(deleteFile); return location != null ? location.toString() : null; } + + public static long contentSizeInBytes(ContentFile file) { + if (file.content() == FileContent.DATA) { + return file.fileSizeInBytes(); + } else { + DeleteFile deleteFile = (DeleteFile) file; + return isDV(deleteFile) ? deleteFile.contentSizeInBytes() : deleteFile.fileSizeInBytes(); + } + } + + public static long contentSizeInBytes(Iterable> files) { + long size = 0L; + for (ContentFile file : files) { + size += contentSizeInBytes(file); + } + return size; + } + + public static boolean isDV(DeleteFile deleteFile) { + return deleteFile.format() == FileFormat.PUFFIN; + } } diff --git a/api/src/main/java/org/apache/iceberg/util/DeleteFileSet.java b/api/src/main/java/org/apache/iceberg/util/DeleteFileSet.java index bbe9824963fc..06ddd1869ace 100644 --- a/api/src/main/java/org/apache/iceberg/util/DeleteFileSet.java +++ b/api/src/main/java/org/apache/iceberg/util/DeleteFileSet.java @@ -97,13 +97,14 @@ public boolean equals(Object o) { } DeleteFileWrapper that = (DeleteFileWrapper) o; - // this needs to be updated once deletion vector support is added - return Objects.equals(file.location(), that.file.location()); + return Objects.equals(file.location(), that.file.location()) + && Objects.equals(file.contentOffset(), that.file.contentOffset()) + && Objects.equals(file.contentSizeInBytes(), that.file.contentSizeInBytes()); } @Override public int hashCode() { - return Objects.hashCode(file.location()); + return Objects.hash(file.location(), file.contentOffset(), file.contentSizeInBytes()); } @Override diff --git a/api/src/test/java/org/apache/iceberg/util/TestContentFileUtil.java b/api/src/test/java/org/apache/iceberg/util/TestContentFileUtil.java new file mode 100644 index 000000000000..c86e282e83f1 --- /dev/null +++ b/api/src/test/java/org/apache/iceberg/util/TestContentFileUtil.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TestContentFileUtil { + + @Test + public void testContentSize() { + DeleteFile dv1 = mockDV("dv1.puffin", 20L, 25L, "data1.parquet"); + DeleteFile dv2 = mockDV("dv2.puffin", 4L, 15L, "data2.parquet"); + + long size1 = ContentFileUtil.contentSizeInBytes(ImmutableList.of()); + assertThat(size1).isEqualTo(0); + + long size2 = ContentFileUtil.contentSizeInBytes(ImmutableList.of(dv1)); + assertThat(size2).isEqualTo(25L); + + long size3 = ContentFileUtil.contentSizeInBytes(ImmutableList.of(dv1, dv2)); + assertThat(size3).isEqualTo(40L); + } + + private static DeleteFile mockDV( + String location, long contentOffset, long contentSize, String referencedDataFile) { + DeleteFile mockFile = Mockito.mock(DeleteFile.class); + Mockito.when(mockFile.format()).thenReturn(FileFormat.PUFFIN); + Mockito.when(mockFile.location()).thenReturn(location); + Mockito.when(mockFile.contentOffset()).thenReturn(contentOffset); + Mockito.when(mockFile.contentSizeInBytes()).thenReturn(contentSize); + Mockito.when(mockFile.referencedDataFile()).thenReturn(referencedDataFile); + return mockFile; + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index f4fd94724e95..e9724637dfa3 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -81,6 +81,8 @@ public PartitionData copy() { private byte[] keyMetadata = null; private Integer sortOrderId; private String referencedDataFile = null; + private Long contentOffset = null; + private Long contentSizeInBytes = null; // cached schema private transient Schema avroSchema = null; @@ -110,6 +112,8 @@ public PartitionData copy() { DataFile.EQUALITY_IDS, DataFile.SORT_ORDER_ID, DataFile.REFERENCED_DATA_FILE, + DataFile.CONTENT_OFFSET, + DataFile.CONTENT_SIZE, MetadataColumns.ROW_POSITION); /** Used by Avro reflection to instantiate this class when reading manifest files. */ @@ -152,7 +156,9 @@ public PartitionData copy() { int[] equalityFieldIds, Integer sortOrderId, ByteBuffer keyMetadata, - String referencedDataFile) { + String referencedDataFile, + Long contentOffset, + Long contentSizeInBytes) { super(BASE_TYPE.fields().size()); this.partitionSpecId = specId; this.content = content; @@ -182,6 +188,8 @@ public PartitionData copy() { this.sortOrderId = sortOrderId; this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); this.referencedDataFile = referencedDataFile; + this.contentOffset = contentOffset; + this.contentSizeInBytes = contentSizeInBytes; } /** @@ -235,6 +243,8 @@ public PartitionData copy() { this.dataSequenceNumber = toCopy.dataSequenceNumber; this.fileSequenceNumber = toCopy.fileSequenceNumber; this.referencedDataFile = toCopy.referencedDataFile; + this.contentOffset = toCopy.contentOffset; + this.contentSizeInBytes = toCopy.contentSizeInBytes; } /** Constructor for Java serialization. */ @@ -347,6 +357,12 @@ protected void internalSet(int pos, T value) { this.referencedDataFile = value != null ? value.toString() : null; return; case 18: + this.contentOffset = (Long) value; + return; + case 19: + this.contentSizeInBytes = (Long) value; + return; + case 20: this.fileOrdinal = (long) value; return; default: @@ -398,6 +414,10 @@ private Object getByPos(int basePos) { case 17: return referencedDataFile; case 18: + return contentOffset; + case 19: + return contentSizeInBytes; + case 20: return fileOrdinal; default: throw new UnsupportedOperationException("Unknown field ordinal: " + basePos); @@ -528,6 +548,14 @@ public String referencedDataFile() { return referencedDataFile; } + public Long contentOffset() { + return contentOffset; + } + + public Long contentSizeInBytes() { + return contentSizeInBytes; + } + private static Map copyMap(Map map, Set keys) { return keys == null ? SerializableMap.copyOf(map) : SerializableMap.filteredCopyOf(map, keys); } @@ -580,6 +608,8 @@ public String toString() { .add("data_sequence_number", dataSequenceNumber == null ? "null" : dataSequenceNumber) .add("file_sequence_number", fileSequenceNumber == null ? "null" : fileSequenceNumber) .add("referenced_data_file", referencedDataFile == null ? "null" : referencedDataFile) + .add("content_offset", contentOffset == null ? "null" : contentOffset) + .add("content_size_in_bytes", contentSizeInBytes == null ? "null" : contentSizeInBytes) .toString(); } } diff --git a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java index 2469395021d4..8b1d0bf665aa 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java +++ b/core/src/main/java/org/apache/iceberg/BaseFileScanTask.java @@ -23,6 +23,7 @@ import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.util.ContentFileUtil; public class BaseFileScanTask extends BaseContentScanTask implements FileScanTask { @@ -79,7 +80,7 @@ private long deletesSizeBytes() { if (deletesSizeBytes == 0L && deletes.length > 0) { long size = 0L; for (DeleteFile deleteFile : deletes) { - size += deleteFile.fileSizeInBytes(); + size += ContentFileUtil.contentSizeInBytes(deleteFile); } this.deletesSizeBytes = size; } @@ -180,11 +181,7 @@ public SplitScanTask merge(ScanTask other) { private long deletesSizeBytes() { if (deletesSizeBytes == 0L && fileScanTask.filesCount() > 1) { - long size = 0L; - for (DeleteFile deleteFile : fileScanTask.deletes()) { - size += deleteFile.fileSizeInBytes(); - } - this.deletesSizeBytes = size; + this.deletesSizeBytes = ContentFileUtil.contentSizeInBytes(fileScanTask.deletes()); } return deletesSizeBytes; diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java index a011d03d59ad..618b2e95f29f 100644 --- a/core/src/main/java/org/apache/iceberg/BaseScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseScan.java @@ -78,6 +78,8 @@ abstract class BaseScan> "key_metadata", "split_offsets", "referenced_data_file", + "content_offset", + "content_size_in_bytes", "equality_ids"); protected static final List DELETE_SCAN_WITH_STATS_COLUMNS = diff --git a/core/src/main/java/org/apache/iceberg/ContentFileParser.java b/core/src/main/java/org/apache/iceberg/ContentFileParser.java index 96dfa5586c31..e6d7c8043f3f 100644 --- a/core/src/main/java/org/apache/iceberg/ContentFileParser.java +++ b/core/src/main/java/org/apache/iceberg/ContentFileParser.java @@ -46,6 +46,8 @@ class ContentFileParser { private static final String EQUALITY_IDS = "equality-ids"; private static final String SORT_ORDER_ID = "sort-order-id"; private static final String REFERENCED_DATA_FILE = "referenced-data-file"; + private static final String CONTENT_OFFSET = "content-offset"; + private static final String CONTENT_SIZE = "content-size-in-bytes"; private ContentFileParser() {} @@ -116,6 +118,14 @@ static void toJson(ContentFile contentFile, PartitionSpec spec, JsonGenerator if (deleteFile.referencedDataFile() != null) { generator.writeStringField(REFERENCED_DATA_FILE, deleteFile.referencedDataFile()); } + + if (deleteFile.contentOffset() != null) { + generator.writeNumberField(CONTENT_OFFSET, deleteFile.contentOffset()); + } + + if (deleteFile.contentSizeInBytes() != null) { + generator.writeNumberField(CONTENT_SIZE, deleteFile.contentSizeInBytes()); + } } generator.writeEndObject(); @@ -155,6 +165,8 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { int[] equalityFieldIds = JsonUtil.getIntArrayOrNull(EQUALITY_IDS, jsonNode); Integer sortOrderId = JsonUtil.getIntOrNull(SORT_ORDER_ID, jsonNode); String referencedDataFile = JsonUtil.getStringOrNull(REFERENCED_DATA_FILE, jsonNode); + Long contentOffset = JsonUtil.getLongOrNull(CONTENT_OFFSET, jsonNode); + Long contentSizeInBytes = JsonUtil.getLongOrNull(CONTENT_SIZE, jsonNode); if (fileContent == FileContent.DATA) { return new GenericDataFile( @@ -180,7 +192,9 @@ static ContentFile fromJson(JsonNode jsonNode, PartitionSpec spec) { sortOrderId, splitOffsets, keyMetadata, - referencedDataFile); + referencedDataFile, + contentOffset, + contentSizeInBytes); } } diff --git a/core/src/main/java/org/apache/iceberg/FileMetadata.java b/core/src/main/java/org/apache/iceberg/FileMetadata.java index ef229593bcab..88619504279d 100644 --- a/core/src/main/java/org/apache/iceberg/FileMetadata.java +++ b/core/src/main/java/org/apache/iceberg/FileMetadata.java @@ -60,6 +60,8 @@ public static class Builder { private Integer sortOrderId = null; private List splitOffsets = null; private String referencedDataFile = null; + private Long contentOffset = null; + private Long contentSizeInBytes = null; Builder(PartitionSpec spec) { this.spec = spec; @@ -230,6 +232,16 @@ public Builder withReferencedDataFile(CharSequence newReferencedDataFile) { return this; } + public Builder withContentOffset(long newContentOffset) { + this.contentOffset = newContentOffset; + return this; + } + + public Builder withContentSizeInBytes(long newContentSizeInBytes) { + this.contentSizeInBytes = newContentSizeInBytes; + return this; + } + public DeleteFile build() { Preconditions.checkArgument(filePath != null, "File path is required"); if (format == null) { @@ -240,6 +252,14 @@ public DeleteFile build() { Preconditions.checkArgument(fileSizeInBytes >= 0, "File size is required"); Preconditions.checkArgument(recordCount >= 0, "Record count is required"); + if (format == FileFormat.PUFFIN) { + Preconditions.checkArgument(contentOffset != null, "Content offset is required for DV"); + Preconditions.checkArgument(contentSizeInBytes != null, "Content size is required for DV"); + } else { + Preconditions.checkArgument(contentOffset == null, "Content offset is only for DV"); + Preconditions.checkArgument(contentSizeInBytes == null, "Content size is only for DV"); + } + switch (content) { case POSITION_DELETES: Preconditions.checkArgument( @@ -273,7 +293,9 @@ public DeleteFile build() { sortOrderId, splitOffsets, keyMetadata, - referencedDataFile); + referencedDataFile, + contentOffset, + contentSizeInBytes); } } } diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index aa34cd22cdaa..a61cc1e0fb72 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -65,7 +65,9 @@ class GenericDataFile extends BaseFile implements DataFile { null /* no equality field IDs */, sortOrderId, keyMetadata, - null /* no referenced data file */); + null /* no referenced data file */, + null /* no content offset */, + null /* no content size */); } /** diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java index 05eb7c97dbab..9205551f24b3 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java @@ -49,7 +49,9 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { Integer sortOrderId, List splitOffsets, ByteBuffer keyMetadata, - String referencedDataFile) { + String referencedDataFile, + Long contentOffset, + Long contentSizeInBytes) { super( specId, content, @@ -68,7 +70,9 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { equalityFieldIds, sortOrderId, keyMetadata, - referencedDataFile); + referencedDataFile, + contentOffset, + contentSizeInBytes); } /** diff --git a/core/src/main/java/org/apache/iceberg/ScanSummary.java b/core/src/main/java/org/apache/iceberg/ScanSummary.java index 1ea171c5b2c3..822ac6d160b1 100644 --- a/core/src/main/java/org/apache/iceberg/ScanSummary.java +++ b/core/src/main/java/org/apache/iceberg/ScanSummary.java @@ -45,6 +45,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ContentFileUtil; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.Pair; @@ -294,7 +295,7 @@ PartitionMetrics updateFromCounts( private PartitionMetrics updateFromFile(ContentFile file, Long timestampMillis) { this.fileCount += 1; this.recordCount += file.recordCount(); - this.totalSize += file.fileSizeInBytes(); + this.totalSize += ContentFileUtil.contentSizeInBytes(file); if (timestampMillis != null && (dataTimestampMillis == null || dataTimestampMillis < timestampMillis)) { this.dataTimestampMillis = timestampMillis; diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index daf1c3d72b89..45b71d654344 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -928,5 +928,15 @@ public Integer sortOrderId() { public String referencedDataFile() { return deleteFile.referencedDataFile(); } + + @Override + public Long contentOffset() { + return deleteFile.contentOffset(); + } + + @Override + public Long contentSizeInBytes() { + return deleteFile.contentSizeInBytes(); + } } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java index 22c9df2a8eaf..1278b82227e4 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java @@ -25,6 +25,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.ContentFileUtil; public class SnapshotSummary { public static final String ADDED_FILES_PROP = "added-data-files"; @@ -275,7 +276,7 @@ void addTo(ImmutableMap.Builder builder) { } void addedFile(ContentFile file) { - this.addedSize += file.fileSizeInBytes(); + this.addedSize += ContentFileUtil.contentSizeInBytes(file); switch (file.content()) { case DATA: this.addedFiles += 1; @@ -298,7 +299,7 @@ void addedFile(ContentFile file) { } void removedFile(ContentFile file) { - this.removedSize += file.fileSizeInBytes(); + this.removedSize += ContentFileUtil.contentSizeInBytes(file); switch (file.content()) { case DATA: this.removedFiles += 1; diff --git a/core/src/main/java/org/apache/iceberg/V3Metadata.java b/core/src/main/java/org/apache/iceberg/V3Metadata.java index a418a868564e..70461ac74a70 100644 --- a/core/src/main/java/org/apache/iceberg/V3Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V3Metadata.java @@ -275,7 +275,9 @@ static Types.StructType fileType(Types.StructType partitionType) { DataFile.SPLIT_OFFSETS, DataFile.EQUALITY_IDS, DataFile.SORT_ORDER_ID, - DataFile.REFERENCED_DATA_FILE); + DataFile.REFERENCED_DATA_FILE, + DataFile.CONTENT_OFFSET, + DataFile.CONTENT_SIZE); } static class IndexedManifestEntry> @@ -455,6 +457,18 @@ public Object get(int pos) { } else { return null; } + case 17: + if (wrapped.content() == FileContent.POSITION_DELETES) { + return ((DeleteFile) wrapped).contentOffset(); + } else { + return null; + } + case 18: + if (wrapped.content() == FileContent.POSITION_DELETES) { + return ((DeleteFile) wrapped).contentSizeInBytes(); + } else { + return null; + } } throw new IllegalArgumentException("Unknown field ordinal: " + pos); } diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java b/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java index c5aa6e1dd673..a4a01f7eb0ca 100644 --- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java +++ b/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java @@ -21,6 +21,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; +import org.apache.iceberg.util.ContentFileUtil; public class ScanMetricsUtil { @@ -43,7 +44,7 @@ public static void fileTask(ScanMetrics metrics, DataFile dataFile, DeleteFile[] long deletesSizeInBytes = 0L; for (DeleteFile deleteFile : deleteFiles) { - deletesSizeInBytes += deleteFile.fileSizeInBytes(); + deletesSizeInBytes += ContentFileUtil.contentSizeInBytes(deleteFile); } metrics.totalDeleteFileSizeInBytes().increment(deletesSizeInBytes); diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java index e2dbcb61e9b7..a02868084435 100644 --- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java @@ -25,7 +25,6 @@ import org.apache.iceberg.BaseCombinedScanTask; import org.apache.iceberg.BaseScanTaskGroup; import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.ContentFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MergeableScanTask; @@ -92,8 +91,7 @@ public static CloseableIterable planTasks( Function weightFunc = file -> Math.max( - file.length() - + file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(), + file.length() + ContentFileUtil.contentSizeInBytes(file.deletes()), (1 + file.deletes().size()) * openFileCost); return CloseableIterable.transform( diff --git a/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java b/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java index f66496ae6624..e1c8ce9ccfed 100644 --- a/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java +++ b/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java @@ -101,6 +101,24 @@ public static DeleteFile generateEqualityDeleteFile(Table table, StructLike part .build(); } + public static DeleteFile generateDV(Table table, DataFile dataFile) { + PartitionSpec spec = table.specs().get(dataFile.specId()); + long fileSize = generateFileSize(); + long cardinality = generateRowCount(); + long offset = generateContentOffset(); + long length = generateContentLength(); + return FileMetadata.deleteFileBuilder(spec) + .ofPositionDeletes() + .withPath("/path/to/delete-" + UUID.randomUUID() + ".puffin") + .withFileSizeInBytes(fileSize) + .withPartition(dataFile.partition()) + .withRecordCount(cardinality) + .withReferencedDataFile(dataFile.location()) + .withContentOffset(offset) + .withContentSizeInBytes(length) + .build(); + } + public static DeleteFile generatePositionDeleteFile(Table table, DataFile dataFile) { PartitionSpec spec = table.spec(); StructLike partition = dataFile.partition(); @@ -229,6 +247,14 @@ private static long generateFileSize() { return random().nextInt(50_000); } + private static long generateContentOffset() { + return random().nextInt(1_000_000); + } + + private static long generateContentLength() { + return random().nextInt(10_000); + } + private static Pair generateBounds(PrimitiveType type, MetricsMode mode) { Comparator cmp = Comparators.forType(type); Object value1 = generateBound(type, mode); diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java b/core/src/test/java/org/apache/iceberg/TestBase.java index 45441631900c..9813d02910a6 100644 --- a/core/src/test/java/org/apache/iceberg/TestBase.java +++ b/core/src/test/java/org/apache/iceberg/TestBase.java @@ -666,6 +666,10 @@ protected DeleteFile newDeleteFileWithRef(DataFile dataFile) { .build(); } + protected DeleteFile newDV(DataFile dataFile) { + return FileGenerationUtil.generateDV(table, dataFile); + } + protected DeleteFile newEqualityDeleteFile(int specId, String partitionPath, int... fieldIds) { PartitionSpec spec = table.specs().get(specId); return FileMetadata.deleteFileBuilder(spec) diff --git a/core/src/test/java/org/apache/iceberg/TestContentFileParser.java b/core/src/test/java/org/apache/iceberg/TestContentFileParser.java index fbe473931659..0c98e8448745 100644 --- a/core/src/test/java/org/apache/iceberg/TestContentFileParser.java +++ b/core/src/test/java/org/apache/iceberg/TestContentFileParser.java @@ -198,6 +198,7 @@ private static DataFile dataFileWithAllOptional(PartitionSpec spec) { private static Stream provideSpecAndDeleteFile() { return Stream.of( + Arguments.of(TestBase.SPEC, dv(TestBase.SPEC), dvJson()), Arguments.of( PartitionSpec.unpartitioned(), deleteFileWithRequiredOnly(PartitionSpec.unpartitioned()), @@ -233,7 +234,9 @@ private static DeleteFile deleteFileWithDataRef(PartitionSpec spec) { null, null, null, - "/path/to/data/file.parquet"); + "/path/to/data/file.parquet", + null, + null); } private static String deleteFileWithDataRefJson() { @@ -242,6 +245,32 @@ private static String deleteFileWithDataRefJson() { + "\"record-count\":10,\"referenced-data-file\":\"/path/to/data/file.parquet\"}"; } + private static DeleteFile dv(PartitionSpec spec) { + PartitionData partitionData = new PartitionData(spec.partitionType()); + partitionData.set(0, 4); + return new GenericDeleteFile( + spec.specId(), + FileContent.POSITION_DELETES, + "/path/to/delete.puffin", + FileFormat.PUFFIN, + partitionData, + 1234, + new Metrics(10L, null, null, null, null), + null, + null, + null, + null, + "/path/to/data/file.parquet", + 4L, + 40L); + } + + private static String dvJson() { + return "{\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete.puffin\"," + + "\"file-format\":\"PUFFIN\",\"partition\":{\"1000\":4},\"file-size-in-bytes\":1234,\"record-count\":10," + + "\"referenced-data-file\":\"/path/to/data/file.parquet\",\"content-offset\":4,\"content-size-in-bytes\":40}"; + } + private static DeleteFile deleteFileWithRequiredOnly(PartitionSpec spec) { PartitionData partitionData = null; if (spec.isPartitioned()) { @@ -261,6 +290,8 @@ private static DeleteFile deleteFileWithRequiredOnly(PartitionSpec spec) { null, null, null, + null, + null, null); } @@ -301,6 +332,8 @@ private static DeleteFile deleteFileWithAllOptional(PartitionSpec spec) { 1, Collections.singletonList(128L), ByteBuffer.wrap(new byte[16]), + null, + null, null); } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java index 1f29c0e5b85c..01d38dc129c9 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java @@ -111,6 +111,8 @@ public class TestManifestEncryption { SORT_ORDER_ID, null, CONTENT_KEY_METADATA, + null, + null, null); private static final EncryptionManager ENCRYPTION_MANAGER = diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index 4652da943003..63c6779298e0 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -130,7 +130,7 @@ public void testDataFilePositions() throws IOException { long expectedPos = 0L; for (DataFile file : reader) { assertThat(file.pos()).as("Position should match").isEqualTo(expectedPos); - assertThat(((BaseFile) file).get(18)) + assertThat(((BaseFile) file).get(20)) .as("Position from field index should match") .isEqualTo(expectedPos); expectedPos += 1; @@ -158,7 +158,7 @@ public void testDeleteFilePositions() throws IOException { long expectedPos = 0L; for (DeleteFile file : reader) { assertThat(file.pos()).as("Position should match").isEqualTo(expectedPos); - assertThat(((BaseFile) file).get(18)) + assertThat(((BaseFile) file).get(20)) .as("Position from field index should match") .isEqualTo(expectedPos); expectedPos += 1; @@ -199,6 +199,30 @@ public void testDeleteFilesWithReferences() throws IOException { } } + @TestTemplate + public void testDVs() throws IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + DeleteFile dv1 = newDV(FILE_A); + DeleteFile dv2 = newDV(FILE_B); + ManifestFile manifest = writeDeleteManifest(formatVersion, 1000L, dv1, dv2); + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(manifest, FILE_IO, table.specs())) { + for (DeleteFile dv : reader) { + if (dv.location().equals(dv1.location())) { + assertThat(dv.location()).isEqualTo(dv1.location()); + assertThat(dv.referencedDataFile()).isEqualTo(FILE_A.location()); + assertThat(dv.contentOffset()).isEqualTo(dv1.contentOffset()); + assertThat(dv.contentSizeInBytes()).isEqualTo(dv1.contentSizeInBytes()); + } else { + assertThat(dv.location()).isEqualTo(dv2.location()); + assertThat(dv.referencedDataFile()).isEqualTo(FILE_B.location()); + assertThat(dv.contentOffset()).isEqualTo(dv2.contentOffset()); + assertThat(dv.contentSizeInBytes()).isEqualTo(dv2.contentSizeInBytes()); + } + } + } + } + @TestTemplate public void testDataFileSplitOffsetsNullWhenInvalid() throws IOException { DataFile invalidOffset = diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index 88dcc6ff9ca4..9abe7c426f32 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -97,6 +97,8 @@ public class TestManifestWriterVersions { SORT_ORDER_ID, null, null, + null, + null, null); @TempDir private Path temp; diff --git a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java index eb713a4d2e0b..8f8343733525 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java +++ b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java @@ -31,6 +31,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MergeableScanTask; import org.apache.iceberg.MockFileScanTask; @@ -74,6 +75,13 @@ private DataFile dataFileWithSize(long size) { return mockFile; } + private DeleteFile dvWithSize(long size) { + DeleteFile mockDeleteFile = Mockito.mock(DeleteFile.class); + Mockito.when(mockDeleteFile.format()).thenReturn(FileFormat.PUFFIN); + Mockito.when(mockDeleteFile.contentSizeInBytes()).thenReturn(size); + return mockDeleteFile; + } + private DeleteFile[] deleteFilesWithSizes(long... sizes) { return Arrays.stream(sizes) .mapToObj( @@ -85,6 +93,14 @@ private DeleteFile[] deleteFilesWithSizes(long... sizes) { .toArray(DeleteFile[]::new); } + @Test + public void testFileScanTaskSizeEstimation() { + DataFile dataFile = dataFileWithSize(100L); + DeleteFile dv = dvWithSize(20L); + MockFileScanTask task = new MockFileScanTask(dataFile, new DeleteFile[] {dv}); + assertThat(task.sizeBytes()).isEqualTo(120L); + } + @Test public void testPlanTaskWithDeleteFiles() { List testFiles = diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java index 9a2f57181708..10861302fd24 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.util.ContentFileUtil; class CommitSummary { @@ -50,7 +51,8 @@ class CommitSummary { .forEach( deleteFile -> { deleteFilesRecordCount.addAndGet(deleteFile.recordCount()); - deleteFilesByteCount.addAndGet(deleteFile.fileSizeInBytes()); + long deleteBytes = ContentFileUtil.contentSizeInBytes(deleteFile); + deleteFilesByteCount.addAndGet(deleteBytes); }); }); } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java index ce2a6c583fdf..ee0359b0eb41 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java @@ -26,6 +26,7 @@ import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.MetricGroup; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.util.ContentFileUtil; class IcebergStreamWriterMetrics { // 1,024 reservoir size should cost about 8KB, which is quite small. @@ -79,7 +80,7 @@ void updateFlushResult(WriteResult result) { Arrays.stream(result.deleteFiles()) .forEach( deleteFile -> { - deleteFilesSizeHistogram.update(deleteFile.fileSizeInBytes()); + deleteFilesSizeHistogram.update(ContentFileUtil.contentSizeInBytes(deleteFile)); }); } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java index 9a2f57181708..10861302fd24 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.util.ContentFileUtil; class CommitSummary { @@ -50,7 +51,8 @@ class CommitSummary { .forEach( deleteFile -> { deleteFilesRecordCount.addAndGet(deleteFile.recordCount()); - deleteFilesByteCount.addAndGet(deleteFile.fileSizeInBytes()); + long deleteBytes = ContentFileUtil.contentSizeInBytes(deleteFile); + deleteFilesByteCount.addAndGet(deleteBytes); }); }); } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java index ce2a6c583fdf..ee0359b0eb41 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java @@ -26,6 +26,7 @@ import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.MetricGroup; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.util.ContentFileUtil; class IcebergStreamWriterMetrics { // 1,024 reservoir size should cost about 8KB, which is quite small. @@ -79,7 +80,7 @@ void updateFlushResult(WriteResult result) { Arrays.stream(result.deleteFiles()) .forEach( deleteFile -> { - deleteFilesSizeHistogram.update(deleteFile.fileSizeInBytes()); + deleteFilesSizeHistogram.update(ContentFileUtil.contentSizeInBytes(deleteFile)); }); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java index 9a2f57181708..10861302fd24 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.util.ContentFileUtil; class CommitSummary { @@ -50,7 +51,8 @@ class CommitSummary { .forEach( deleteFile -> { deleteFilesRecordCount.addAndGet(deleteFile.recordCount()); - deleteFilesByteCount.addAndGet(deleteFile.fileSizeInBytes()); + long deleteBytes = ContentFileUtil.contentSizeInBytes(deleteFile); + deleteFilesByteCount.addAndGet(deleteBytes); }); }); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java index ce2a6c583fdf..ee0359b0eb41 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java @@ -26,6 +26,7 @@ import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.MetricGroup; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.util.ContentFileUtil; class IcebergStreamWriterMetrics { // 1,024 reservoir size should cost about 8KB, which is quite small. @@ -79,7 +80,7 @@ void updateFlushResult(WriteResult result) { Arrays.stream(result.deleteFiles()) .forEach( deleteFile -> { - deleteFilesSizeHistogram.update(deleteFile.fileSizeInBytes()); + deleteFilesSizeHistogram.update(ContentFileUtil.contentSizeInBytes(deleteFile)); }); }