Skip to content

Commit

Permalink
API, Core: Add content offset and size to DeleteFile
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Nov 2, 2024
1 parent d9b9768 commit 76a372d
Show file tree
Hide file tree
Showing 32 changed files with 349 additions and 31 deletions.
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/AddedRowsScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.util.ContentFileUtil;

/**
* A scan task for inserts generated by adding a data file to the table.
Expand Down Expand Up @@ -55,7 +56,7 @@ default ChangelogOperation operation() {

@Override
default long sizeBytes() {
return length() + deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
return length() + ContentFileUtil.contentSizeInBytes(deletes());
}

@Override
Expand Down
15 changes: 13 additions & 2 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,21 @@ public interface DataFile extends ContentFile<DataFile> {
"referenced_data_file",
StringType.get(),
"Fully qualified location (URI with FS scheme) of a data file that all deletes reference");
Types.NestedField CONTENT_OFFSET =
optional(
144, "content_offset", LongType.get(), "The offset in the file where the content starts");
Types.NestedField CONTENT_SIZE =
optional(
145,
"content_size_in_bytes",
LongType.get(),
"The length of referenced content stored in the file");

int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";

// NEXT ID TO ASSIGN: 144
// NEXT ID TO ASSIGN: 146

static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
Expand All @@ -131,7 +140,9 @@ static StructType getType(StructType partitionType) {
SPLIT_OFFSETS,
EQUALITY_IDS,
SORT_ORDER_ID,
REFERENCED_DATA_FILE);
REFERENCED_DATA_FILE,
CONTENT_OFFSET,
CONTENT_SIZE);
}

/**
Expand Down
22 changes: 22 additions & 0 deletions api/src/main/java/org/apache/iceberg/DeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,26 @@ default List<Long> splitOffsets() {
default String referencedDataFile() {
return null;
}

/**
* Returns the offset in the file where the content starts.
*
* <p>The content offset is required for deletion vectors and points to the start of the deletion
* vector blob in the Puffin file, enabling direct access. This method always returns null for
* equality and position delete files.
*/
default Long contentOffset() {
return null;
}

/**
* Returns the length of referenced content stored in the file.
*
* <p>The content size is required for deletion vectors and indicates the size of the deletion
* vector blob in the Puffin file, enabling direct access. This method always returns null for
* equality and position delete files.
*/
default Long contentSizeInBytes() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.util.ContentFileUtil;

/**
* A scan task for deletes generated by removing a data file from the table.
Expand Down Expand Up @@ -54,7 +55,7 @@ default ChangelogOperation operation() {

@Override
default long sizeBytes() {
return length() + existingDeletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
return length() + ContentFileUtil.contentSizeInBytes(existingDeletes());
}

@Override
Expand Down
7 changes: 4 additions & 3 deletions api/src/main/java/org/apache/iceberg/DeletedRowsScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.util.ContentFileUtil;

/**
* A scan task for deletes generated by adding delete files to the table.
Expand Down Expand Up @@ -63,9 +64,9 @@ default ChangelogOperation operation() {

@Override
default long sizeBytes() {
return length()
+ addedDeletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum()
+ existingDeletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
long addedDeletesSize = ContentFileUtil.contentSizeInBytes(addedDeletes());
long existingDeletesSize = ContentFileUtil.contentSizeInBytes(existingDeletes());
return length() + addedDeletesSize + existingDeletesSize;
}

@Override
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/org/apache/iceberg/FileFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

/** Enum of supported file formats. */
public enum FileFormat {
PUFFIN("puffin", false),
ORC("orc", true),
PARQUET("parquet", true),
AVRO("avro", true),
Expand Down
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/FileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.util.ContentFileUtil;

/** A scan task over a range of bytes in a single data file. */
public interface FileScanTask extends ContentScanTask<DataFile>, SplittableScanTask<FileScanTask> {
Expand All @@ -36,7 +37,7 @@ default Schema schema() {

@Override
default long sizeBytes() {
return length() + deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
return length() + ContentFileUtil.contentSizeInBytes(deletes());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -84,4 +85,25 @@ public static String referencedDataFileLocation(DeleteFile deleteFile) {
CharSequence location = referencedDataFile(deleteFile);
return location != null ? location.toString() : null;
}

public static long contentSizeInBytes(ContentFile<?> file) {
if (file.content() == FileContent.DATA) {
return file.fileSizeInBytes();
} else {
DeleteFile deleteFile = (DeleteFile) file;
return isDV(deleteFile) ? deleteFile.contentSizeInBytes() : deleteFile.fileSizeInBytes();
}
}

public static long contentSizeInBytes(Iterable<? extends ContentFile<?>> files) {
long size = 0L;
for (ContentFile<?> file : files) {
size += contentSizeInBytes(file);
}
return size;
}

public static boolean isDV(DeleteFile deleteFile) {
return deleteFile.format() == FileFormat.PUFFIN;
}
}
7 changes: 4 additions & 3 deletions api/src/main/java/org/apache/iceberg/util/DeleteFileSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,14 @@ public boolean equals(Object o) {
}

DeleteFileWrapper that = (DeleteFileWrapper) o;
// this needs to be updated once deletion vector support is added
return Objects.equals(file.location(), that.file.location());
return Objects.equals(file.location(), that.file.location())
&& Objects.equals(file.contentOffset(), that.file.contentOffset())
&& Objects.equals(file.contentSizeInBytes(), that.file.contentSizeInBytes());
}

@Override
public int hashCode() {
return Objects.hashCode(file.location());
return Objects.hash(file.location(), file.contentOffset(), file.contentSizeInBytes());
}

@Override
Expand Down
56 changes: 56 additions & 0 deletions api/src/test/java/org/apache/iceberg/util/TestContentFileUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TestContentFileUtil {

@Test
public void testContentSize() {
DeleteFile dv1 = mockDV("dv1.puffin", 20L, 25L, "data1.parquet");
DeleteFile dv2 = mockDV("dv2.puffin", 4L, 15L, "data2.parquet");

long size1 = ContentFileUtil.contentSizeInBytes(ImmutableList.of());
assertThat(size1).isEqualTo(0);

long size2 = ContentFileUtil.contentSizeInBytes(ImmutableList.of(dv1));
assertThat(size2).isEqualTo(25L);

long size3 = ContentFileUtil.contentSizeInBytes(ImmutableList.of(dv1, dv2));
assertThat(size3).isEqualTo(40L);
}

private static DeleteFile mockDV(
String location, long contentOffset, long contentSize, String referencedDataFile) {
DeleteFile mockFile = Mockito.mock(DeleteFile.class);
Mockito.when(mockFile.format()).thenReturn(FileFormat.PUFFIN);
Mockito.when(mockFile.location()).thenReturn(location);
Mockito.when(mockFile.contentOffset()).thenReturn(contentOffset);
Mockito.when(mockFile.contentSizeInBytes()).thenReturn(contentSize);
Mockito.when(mockFile.referencedDataFile()).thenReturn(referencedDataFile);
return mockFile;
}
}
32 changes: 31 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public PartitionData copy() {
private byte[] keyMetadata = null;
private Integer sortOrderId;
private String referencedDataFile = null;
private Long contentOffset = null;
private Long contentSizeInBytes = null;

// cached schema
private transient Schema avroSchema = null;
Expand Down Expand Up @@ -110,6 +112,8 @@ public PartitionData copy() {
DataFile.EQUALITY_IDS,
DataFile.SORT_ORDER_ID,
DataFile.REFERENCED_DATA_FILE,
DataFile.CONTENT_OFFSET,
DataFile.CONTENT_SIZE,
MetadataColumns.ROW_POSITION);

/** Used by Avro reflection to instantiate this class when reading manifest files. */
Expand Down Expand Up @@ -152,7 +156,9 @@ public PartitionData copy() {
int[] equalityFieldIds,
Integer sortOrderId,
ByteBuffer keyMetadata,
String referencedDataFile) {
String referencedDataFile,
Long contentOffset,
Long contentSizeInBytes) {
super(BASE_TYPE.fields().size());
this.partitionSpecId = specId;
this.content = content;
Expand Down Expand Up @@ -182,6 +188,8 @@ public PartitionData copy() {
this.sortOrderId = sortOrderId;
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
this.referencedDataFile = referencedDataFile;
this.contentOffset = contentOffset;
this.contentSizeInBytes = contentSizeInBytes;
}

/**
Expand Down Expand Up @@ -235,6 +243,8 @@ public PartitionData copy() {
this.dataSequenceNumber = toCopy.dataSequenceNumber;
this.fileSequenceNumber = toCopy.fileSequenceNumber;
this.referencedDataFile = toCopy.referencedDataFile;
this.contentOffset = toCopy.contentOffset;
this.contentSizeInBytes = toCopy.contentSizeInBytes;
}

/** Constructor for Java serialization. */
Expand Down Expand Up @@ -347,6 +357,12 @@ protected <T> void internalSet(int pos, T value) {
this.referencedDataFile = value != null ? value.toString() : null;
return;
case 18:
this.contentOffset = (Long) value;
return;
case 19:
this.contentSizeInBytes = (Long) value;
return;
case 20:
this.fileOrdinal = (long) value;
return;
default:
Expand Down Expand Up @@ -398,6 +414,10 @@ private Object getByPos(int basePos) {
case 17:
return referencedDataFile;
case 18:
return contentOffset;
case 19:
return contentSizeInBytes;
case 20:
return fileOrdinal;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + basePos);
Expand Down Expand Up @@ -528,6 +548,14 @@ public String referencedDataFile() {
return referencedDataFile;
}

public Long contentOffset() {
return contentOffset;
}

public Long contentSizeInBytes() {
return contentSizeInBytes;
}

private static <K, V> Map<K, V> copyMap(Map<K, V> map, Set<K> keys) {
return keys == null ? SerializableMap.copyOf(map) : SerializableMap.filteredCopyOf(map, keys);
}
Expand Down Expand Up @@ -580,6 +608,8 @@ public String toString() {
.add("data_sequence_number", dataSequenceNumber == null ? "null" : dataSequenceNumber)
.add("file_sequence_number", fileSequenceNumber == null ? "null" : fileSequenceNumber)
.add("referenced_data_file", referencedDataFile == null ? "null" : referencedDataFile)
.add("content_offset", contentOffset == null ? "null" : contentOffset)
.add("content_size_in_bytes", contentSizeInBytes == null ? "null" : contentSizeInBytes)
.toString();
}
}
9 changes: 3 additions & 6 deletions core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.util.ContentFileUtil;

public class BaseFileScanTask extends BaseContentScanTask<FileScanTask, DataFile>
implements FileScanTask {
Expand Down Expand Up @@ -79,7 +80,7 @@ private long deletesSizeBytes() {
if (deletesSizeBytes == 0L && deletes.length > 0) {
long size = 0L;
for (DeleteFile deleteFile : deletes) {
size += deleteFile.fileSizeInBytes();
size += ContentFileUtil.contentSizeInBytes(deleteFile);
}
this.deletesSizeBytes = size;
}
Expand Down Expand Up @@ -180,11 +181,7 @@ public SplitScanTask merge(ScanTask other) {

private long deletesSizeBytes() {
if (deletesSizeBytes == 0L && fileScanTask.filesCount() > 1) {
long size = 0L;
for (DeleteFile deleteFile : fileScanTask.deletes()) {
size += deleteFile.fileSizeInBytes();
}
this.deletesSizeBytes = size;
this.deletesSizeBytes = ContentFileUtil.contentSizeInBytes(fileScanTask.deletes());
}

return deletesSizeBytes;
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
"key_metadata",
"split_offsets",
"referenced_data_file",
"content_offset",
"content_size_in_bytes",
"equality_ids");

protected static final List<String> DELETE_SCAN_WITH_STATS_COLUMNS =
Expand Down
Loading

0 comments on commit 76a372d

Please sign in to comment.