From 66f8307db8f224915a1051e0e167d5fe68b94b65 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Tue, 5 Nov 2024 11:06:16 -0700 Subject: [PATCH] Core: Change delete file granularity to file --- .../main/java/org/apache/iceberg/TableMetadata.java | 2 ++ .../main/java/org/apache/iceberg/TableProperties.java | 2 ++ .../apache/iceberg/spark/extensions/TestDelete.java | 9 +++++++-- .../apache/iceberg/spark/extensions/TestUpdate.java | 11 ++++++++--- .../apache/iceberg/spark/extensions/TestDelete.java | 10 +++++++--- .../apache/iceberg/spark/extensions/TestUpdate.java | 10 +++++++--- 6 files changed, 33 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 3cdc53995dce..f9822243dbf9 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -90,6 +90,8 @@ private static Map persistedProperties(Map rawPr persistedProperties.put( TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + persistedProperties.put( + TableProperties.DELETE_GRANULARITY, TableProperties.DELETE_GRANULARITY_DEFAULT_SINCE_1_8_0); rawProperties.entrySet().stream() .filter(entry -> !TableProperties.RESERVED_PROPERTIES.contains(entry.getKey())) diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index c137bcd3a2c3..4633bae922bf 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -354,6 +354,8 @@ private TableProperties() {} public static final String DELETE_GRANULARITY = "write.delete.granularity"; public static final String DELETE_GRANULARITY_DEFAULT = DeleteGranularity.PARTITION.toString(); + public static final String DELETE_GRANULARITY_DEFAULT_SINCE_1_8_0 = + DeleteGranularity.FILE.toString(); public static final String DELETE_ISOLATION_LEVEL = "write.delete.isolation-level"; public static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable"; diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index bd70243fa0d5..be71093c0eaa 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -58,6 +58,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -154,7 +155,7 @@ public void testDeleteWithVectorizedReads() throws NoSuchTableException { } @Test - public void testCoalesceDelete() throws Exception { + public void testCoalesceDeleteWithPartitionGranularity() throws Exception { createAndInitUnpartitionedTable(); Employee[] employees = new Employee[100]; @@ -168,18 +169,22 @@ public void testCoalesceDelete() throws Exception { // set the open file cost large enough to produce a separate scan task per file // use range distribution to trigger a shuffle + // set partitioned scoped deletes so that 1 delete file is written as part of the output task Map tableProps = ImmutableMap.of( SPLIT_OPEN_FILE_COST, String.valueOf(Integer.MAX_VALUE), DELETE_DISTRIBUTION_MODE, - DistributionMode.RANGE.modeName()); + DistributionMode.RANGE.modeName(), + TableProperties.DELETE_GRANULARITY, + DeleteGranularity.PARTITION.toString()); sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); createBranchIfNeeded(); // enable AQE and set the advisory partition size big enough to trigger combining // set the number of shuffle partitions to 200 to distribute the work across reducers + // set the advisory partition size for shuffles small enough to ensure writes override it withSQLConf( ImmutableMap.of( SQLConf.SHUFFLE_PARTITIONS().key(), "200", diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index e84789de7abc..568109879b08 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -56,6 +56,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -135,7 +136,7 @@ public void testUpdateWithVectorizedReads() { } @Test - public void testCoalesceUpdate() { + public void testCoalesceUpdateWithPartitionGranularityDeletes() { createAndInitTable("id INT, dep STRING"); String[] records = new String[100]; @@ -149,18 +150,22 @@ public void testCoalesceUpdate() { // set the open file cost large enough to produce a separate scan task per file // use range distribution to trigger a shuffle + // set partitioned scoped deletes so that 1 delete file is written as part of the output task Map tableProps = ImmutableMap.of( SPLIT_OPEN_FILE_COST, String.valueOf(Integer.MAX_VALUE), UPDATE_DISTRIBUTION_MODE, - DistributionMode.RANGE.modeName()); + DistributionMode.RANGE.modeName(), + TableProperties.DELETE_GRANULARITY, + DeleteGranularity.PARTITION.toString()); sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); createBranchIfNeeded(); // enable AQE and set the advisory partition size big enough to trigger combining // set the number of shuffle partitions to 200 to distribute the work across reducers + // set the advisory partition size for shuffles small enough to ensure writes override it withSQLConf( ImmutableMap.of( SQLConf.SHUFFLE_PARTITIONS().key(), "200", @@ -444,7 +449,7 @@ public void testUpdateWithoutCondition() { validateProperty(currentSnapshot, DELETED_FILES_PROP, "3"); validateProperty(currentSnapshot, ADDED_FILES_PROP, ImmutableSet.of("2", "3")); } else { - validateMergeOnRead(currentSnapshot, "2", "2", "2"); + validateMergeOnRead(currentSnapshot, "2", "3", "2"); } assertEquals( diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 42eb2af774e9..fc5135cb13df 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -59,6 +59,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -133,7 +134,7 @@ public void testDeleteWithVectorizedReads() throws NoSuchTableException { } @TestTemplate - public void testCoalesceDelete() throws Exception { + public void testCoalesceDeleteWithPartitionGranularity() throws Exception { createAndInitUnpartitionedTable(); Employee[] employees = new Employee[100]; @@ -147,12 +148,15 @@ public void testCoalesceDelete() throws Exception { // set the open file cost large enough to produce a separate scan task per file // use range distribution to trigger a shuffle + // set partitioned scoped deletes so that 1 delete file is written as part of the output task Map tableProps = ImmutableMap.of( SPLIT_OPEN_FILE_COST, String.valueOf(Integer.MAX_VALUE), DELETE_DISTRIBUTION_MODE, - DistributionMode.RANGE.modeName()); + DistributionMode.RANGE.modeName(), + TableProperties.DELETE_GRANULARITY, + DeleteGranularity.PARTITION.toString()); sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); createBranchIfNeeded(); @@ -1293,7 +1297,7 @@ public void testDeleteWithMultipleSpecs() { if (mode(table) == COPY_ON_WRITE) { validateCopyOnWrite(currentSnapshot, "3", "4", "1"); } else { - validateMergeOnRead(currentSnapshot, "3", "3", null); + validateMergeOnRead(currentSnapshot, "3", "4", null); } assertEquals( diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 09aa51f0460a..c06a2b11644b 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -57,6 +57,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -116,7 +117,7 @@ public void testUpdateWithVectorizedReads() { } @TestTemplate - public void testCoalesceUpdate() { + public void testCoalesceUpdateWithPartitionGranularityDeletes() { createAndInitTable("id INT, dep STRING"); String[] records = new String[100]; @@ -130,12 +131,15 @@ public void testCoalesceUpdate() { // set the open file cost large enough to produce a separate scan task per file // use range distribution to trigger a shuffle + // set partitioned scoped deletes so that 1 delete file is written as part of the output task Map tableProps = ImmutableMap.of( SPLIT_OPEN_FILE_COST, String.valueOf(Integer.MAX_VALUE), UPDATE_DISTRIBUTION_MODE, - DistributionMode.RANGE.modeName()); + DistributionMode.RANGE.modeName(), + TableProperties.DELETE_GRANULARITY, + DeleteGranularity.PARTITION.toString()); sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); createBranchIfNeeded(); @@ -437,7 +441,7 @@ public void testUpdateWithoutCondition() { validateProperty(currentSnapshot, DELETED_FILES_PROP, "3"); validateProperty(currentSnapshot, ADDED_FILES_PROP, ImmutableSet.of("2", "3")); } else { - validateMergeOnRead(currentSnapshot, "2", "2", "2"); + validateMergeOnRead(currentSnapshot, "2", "3", "2"); } assertEquals(