Skip to content

Commit

Permalink
Core: Change delete file granularity to file
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Nov 6, 2024
1 parent ad24d4b commit 66f8307
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 11 deletions.
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ private static Map<String, String> persistedProperties(Map<String, String> rawPr
persistedProperties.put(
TableProperties.PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
persistedProperties.put(
TableProperties.DELETE_GRANULARITY, TableProperties.DELETE_GRANULARITY_DEFAULT_SINCE_1_8_0);

rawProperties.entrySet().stream()
.filter(entry -> !TableProperties.RESERVED_PROPERTIES.contains(entry.getKey()))
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ private TableProperties() {}

public static final String DELETE_GRANULARITY = "write.delete.granularity";
public static final String DELETE_GRANULARITY_DEFAULT = DeleteGranularity.PARTITION.toString();
public static final String DELETE_GRANULARITY_DEFAULT_SINCE_1_8_0 =
DeleteGranularity.FILE.toString();

public static final String DELETE_ISOLATION_LEVEL = "write.delete.isolation-level";
public static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -154,7 +155,7 @@ public void testDeleteWithVectorizedReads() throws NoSuchTableException {
}

@Test
public void testCoalesceDelete() throws Exception {
public void testCoalesceDeleteWithPartitionGranularity() throws Exception {
createAndInitUnpartitionedTable();

Employee[] employees = new Employee[100];
Expand All @@ -168,18 +169,22 @@ public void testCoalesceDelete() throws Exception {

// set the open file cost large enough to produce a separate scan task per file
// use range distribution to trigger a shuffle
// set partitioned scoped deletes so that 1 delete file is written as part of the output task
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
DELETE_DISTRIBUTION_MODE,
DistributionMode.RANGE.modeName());
DistributionMode.RANGE.modeName(),
TableProperties.DELETE_GRANULARITY,
DeleteGranularity.PARTITION.toString());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();

// enable AQE and set the advisory partition size big enough to trigger combining
// set the number of shuffle partitions to 200 to distribute the work across reducers
// set the advisory partition size for shuffles small enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -135,7 +136,7 @@ public void testUpdateWithVectorizedReads() {
}

@Test
public void testCoalesceUpdate() {
public void testCoalesceUpdateWithPartitionGranularityDeletes() {
createAndInitTable("id INT, dep STRING");

String[] records = new String[100];
Expand All @@ -149,18 +150,22 @@ public void testCoalesceUpdate() {

// set the open file cost large enough to produce a separate scan task per file
// use range distribution to trigger a shuffle
// set partitioned scoped deletes so that 1 delete file is written as part of the output task
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
UPDATE_DISTRIBUTION_MODE,
DistributionMode.RANGE.modeName());
DistributionMode.RANGE.modeName(),
TableProperties.DELETE_GRANULARITY,
DeleteGranularity.PARTITION.toString());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();

// enable AQE and set the advisory partition size big enough to trigger combining
// set the number of shuffle partitions to 200 to distribute the work across reducers
// set the advisory partition size for shuffles small enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
Expand Down Expand Up @@ -444,7 +449,7 @@ public void testUpdateWithoutCondition() {
validateProperty(currentSnapshot, DELETED_FILES_PROP, "3");
validateProperty(currentSnapshot, ADDED_FILES_PROP, ImmutableSet.of("2", "3"));
} else {
validateMergeOnRead(currentSnapshot, "2", "2", "2");
validateMergeOnRead(currentSnapshot, "2", "3", "2");
}

assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -133,7 +134,7 @@ public void testDeleteWithVectorizedReads() throws NoSuchTableException {
}

@TestTemplate
public void testCoalesceDelete() throws Exception {
public void testCoalesceDeleteWithPartitionGranularity() throws Exception {
createAndInitUnpartitionedTable();

Employee[] employees = new Employee[100];
Expand All @@ -147,12 +148,15 @@ public void testCoalesceDelete() throws Exception {

// set the open file cost large enough to produce a separate scan task per file
// use range distribution to trigger a shuffle
// set partitioned scoped deletes so that 1 delete file is written as part of the output task
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
DELETE_DISTRIBUTION_MODE,
DistributionMode.RANGE.modeName());
DistributionMode.RANGE.modeName(),
TableProperties.DELETE_GRANULARITY,
DeleteGranularity.PARTITION.toString());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();
Expand Down Expand Up @@ -1293,7 +1297,7 @@ public void testDeleteWithMultipleSpecs() {
if (mode(table) == COPY_ON_WRITE) {
validateCopyOnWrite(currentSnapshot, "3", "4", "1");
} else {
validateMergeOnRead(currentSnapshot, "3", "3", null);
validateMergeOnRead(currentSnapshot, "3", "4", null);
}

assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void testUpdateWithVectorizedReads() {
}

@TestTemplate
public void testCoalesceUpdate() {
public void testCoalesceUpdateWithPartitionGranularityDeletes() {
createAndInitTable("id INT, dep STRING");

String[] records = new String[100];
Expand All @@ -130,12 +131,15 @@ public void testCoalesceUpdate() {

// set the open file cost large enough to produce a separate scan task per file
// use range distribution to trigger a shuffle
// set partitioned scoped deletes so that 1 delete file is written as part of the output task
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
UPDATE_DISTRIBUTION_MODE,
DistributionMode.RANGE.modeName());
DistributionMode.RANGE.modeName(),
TableProperties.DELETE_GRANULARITY,
DeleteGranularity.PARTITION.toString());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();
Expand Down Expand Up @@ -437,7 +441,7 @@ public void testUpdateWithoutCondition() {
validateProperty(currentSnapshot, DELETED_FILES_PROP, "3");
validateProperty(currentSnapshot, ADDED_FILES_PROP, ImmutableSet.of("2", "3"));
} else {
validateMergeOnRead(currentSnapshot, "2", "2", "2");
validateMergeOnRead(currentSnapshot, "2", "3", "2");
}

assertEquals(
Expand Down

0 comments on commit 66f8307

Please sign in to comment.