From e4e1f0fa8be26845757b08f02348bbe2463d55c2 Mon Sep 17 00:00:00 2001 From: Amogh Jahagirdar Date: Tue, 5 Nov 2024 11:06:16 -0700 Subject: [PATCH] Core: Change delete file granularity to file --- .../org/apache/iceberg/TableMetadata.java | 2 ++ .../org/apache/iceberg/TableProperties.java | 2 ++ .../iceberg/spark/extensions/TestDelete.java | 35 ++++++++++++------- .../iceberg/spark/extensions/TestUpdate.java | 26 +++++++++----- .../iceberg/spark/extensions/TestDelete.java | 10 ++++-- .../iceberg/spark/extensions/TestUpdate.java | 10 ++++-- 6 files changed, 59 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 3cdc53995dce..f9822243dbf9 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -90,6 +90,8 @@ private static Map persistedProperties(Map rawPr persistedProperties.put( TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + persistedProperties.put( + TableProperties.DELETE_GRANULARITY, TableProperties.DELETE_GRANULARITY_DEFAULT_SINCE_1_8_0); rawProperties.entrySet().stream() .filter(entry -> !TableProperties.RESERVED_PROPERTIES.contains(entry.getKey())) diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index c137bcd3a2c3..4633bae922bf 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -354,6 +354,8 @@ private TableProperties() {} public static final String DELETE_GRANULARITY = "write.delete.granularity"; public static final String DELETE_GRANULARITY_DEFAULT = DeleteGranularity.PARTITION.toString(); + public static final String DELETE_GRANULARITY_DEFAULT_SINCE_1_8_0 = + DeleteGranularity.FILE.toString(); public static final String DELETE_ISOLATION_LEVEL = "write.delete.isolation-level"; public static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable"; diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index bd70243fa0d5..0b890611f37a 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -153,8 +153,8 @@ public void testDeleteWithVectorizedReads() throws NoSuchTableException { sql("SELECT * FROM %s ORDER BY id ASC", selectTarget())); } - @Test - public void testCoalesceDelete() throws Exception { + @TestTemplate + public void testCoalesceDeleteWithPartitionGranularity() throws Exception { createAndInitUnpartitionedTable(); Employee[] employees = new Employee[100]; @@ -168,24 +168,34 @@ public void testCoalesceDelete() throws Exception { // set the open file cost large enough to produce a separate scan task per file // use range distribution to trigger a shuffle + // set partitioned scoped deletes so that 1 delete file is written as part of the output task Map tableProps = ImmutableMap.of( SPLIT_OPEN_FILE_COST, String.valueOf(Integer.MAX_VALUE), DELETE_DISTRIBUTION_MODE, - DistributionMode.RANGE.modeName()); + DistributionMode.RANGE.modeName(), + TableProperties.DELETE_GRANULARITY, + DeleteGranularity.PARTITION.toString()); sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); createBranchIfNeeded(); // enable AQE and set the advisory partition size big enough to trigger combining // set the number of shuffle partitions to 200 to distribute the work across reducers + // set the advisory partition size for shuffles small enough to ensure writes override it withSQLConf( ImmutableMap.of( - SQLConf.SHUFFLE_PARTITIONS().key(), "200", - SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", - SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true", - SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"), + SQLConf.SHUFFLE_PARTITIONS().key(), + "200", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "true", + SQLConf.COALESCE_PARTITIONS_ENABLED().key(), + "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), + "100", + SparkSQLProperties.ADVISORY_PARTITION_SIZE, + String.valueOf(256 * 1024 * 1024)), () -> { SparkPlan plan = executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget()); @@ -210,8 +220,9 @@ public void testCoalesceDelete() throws Exception { validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "1"); } - Assert.assertEquals( - "Row count must match", 200L, scalarSql("SELECT COUNT(*) FROM %s", commitTarget())); + assertThat(scalarSql("SELECT COUNT(*) FROM %s", commitTarget())) + .as("Row count must match") + .isEqualTo(200L); } @Test @@ -1256,7 +1267,7 @@ public void testDeleteRefreshesRelationCache() throws NoSuchTableException { spark.sql("UNCACHE TABLE tmp"); } - @Test + @TestTemplate public void testDeleteWithMultipleSpecs() { createAndInitTable("id INT, dep STRING, category STRING"); @@ -1282,13 +1293,13 @@ public void testDeleteWithMultipleSpecs() { sql("DELETE FROM %s WHERE id IN (1, 3, 5, 7)", commitTarget()); Table table = validationCatalog.loadTable(tableIdent); - Assert.assertEquals("Should have 5 snapshots", 5, Iterables.size(table.snapshots())); + assertThat(table.snapshots()).as("Should have 5 snapshots").hasSize(5); Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch); if (mode(table) == COPY_ON_WRITE) { validateCopyOnWrite(currentSnapshot, "3", "4", "1"); } else { - validateMergeOnRead(currentSnapshot, "3", "3", null); + validateMergeOnRead(currentSnapshot, "3", "4", null); } assertEquals( diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index e84789de7abc..e0b0e146508b 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -134,8 +134,8 @@ public void testUpdateWithVectorizedReads() { sql("SELECT * FROM %s ORDER BY id", selectTarget())); } - @Test - public void testCoalesceUpdate() { + @TestTemplate + public void testCoalesceUpdateWithPartitionGranularityDeletes() { createAndInitTable("id INT, dep STRING"); String[] records = new String[100]; @@ -149,24 +149,34 @@ public void testCoalesceUpdate() { // set the open file cost large enough to produce a separate scan task per file // use range distribution to trigger a shuffle + // set partitioned scoped deletes so that 1 delete file is written as part of the output task Map tableProps = ImmutableMap.of( SPLIT_OPEN_FILE_COST, String.valueOf(Integer.MAX_VALUE), UPDATE_DISTRIBUTION_MODE, - DistributionMode.RANGE.modeName()); + DistributionMode.RANGE.modeName(), + TableProperties.DELETE_GRANULARITY, + DeleteGranularity.PARTITION.toString()); sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); createBranchIfNeeded(); // enable AQE and set the advisory partition size big enough to trigger combining // set the number of shuffle partitions to 200 to distribute the work across reducers + // set the advisory partition size for shuffles small enough to ensure writes override it withSQLConf( ImmutableMap.of( - SQLConf.SHUFFLE_PARTITIONS().key(), "200", - SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true", - SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true", - SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"), + SQLConf.SHUFFLE_PARTITIONS().key(), + "200", + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), + "true", + SQLConf.COALESCE_PARTITIONS_ENABLED().key(), + "true", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), + "100", + SparkSQLProperties.ADVISORY_PARTITION_SIZE, + String.valueOf(256 * 1024 * 1024)), () -> { SparkPlan plan = executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget()); @@ -444,7 +454,7 @@ public void testUpdateWithoutCondition() { validateProperty(currentSnapshot, DELETED_FILES_PROP, "3"); validateProperty(currentSnapshot, ADDED_FILES_PROP, ImmutableSet.of("2", "3")); } else { - validateMergeOnRead(currentSnapshot, "2", "2", "2"); + validateMergeOnRead(currentSnapshot, "2", "3", "2"); } assertEquals( diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 42eb2af774e9..fc5135cb13df 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -59,6 +59,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -133,7 +134,7 @@ public void testDeleteWithVectorizedReads() throws NoSuchTableException { } @TestTemplate - public void testCoalesceDelete() throws Exception { + public void testCoalesceDeleteWithPartitionGranularity() throws Exception { createAndInitUnpartitionedTable(); Employee[] employees = new Employee[100]; @@ -147,12 +148,15 @@ public void testCoalesceDelete() throws Exception { // set the open file cost large enough to produce a separate scan task per file // use range distribution to trigger a shuffle + // set partitioned scoped deletes so that 1 delete file is written as part of the output task Map tableProps = ImmutableMap.of( SPLIT_OPEN_FILE_COST, String.valueOf(Integer.MAX_VALUE), DELETE_DISTRIBUTION_MODE, - DistributionMode.RANGE.modeName()); + DistributionMode.RANGE.modeName(), + TableProperties.DELETE_GRANULARITY, + DeleteGranularity.PARTITION.toString()); sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); createBranchIfNeeded(); @@ -1293,7 +1297,7 @@ public void testDeleteWithMultipleSpecs() { if (mode(table) == COPY_ON_WRITE) { validateCopyOnWrite(currentSnapshot, "3", "4", "1"); } else { - validateMergeOnRead(currentSnapshot, "3", "3", null); + validateMergeOnRead(currentSnapshot, "3", "4", null); } assertEquals( diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 09aa51f0460a..c06a2b11644b 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -57,6 +57,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -116,7 +117,7 @@ public void testUpdateWithVectorizedReads() { } @TestTemplate - public void testCoalesceUpdate() { + public void testCoalesceUpdateWithPartitionGranularityDeletes() { createAndInitTable("id INT, dep STRING"); String[] records = new String[100]; @@ -130,12 +131,15 @@ public void testCoalesceUpdate() { // set the open file cost large enough to produce a separate scan task per file // use range distribution to trigger a shuffle + // set partitioned scoped deletes so that 1 delete file is written as part of the output task Map tableProps = ImmutableMap.of( SPLIT_OPEN_FILE_COST, String.valueOf(Integer.MAX_VALUE), UPDATE_DISTRIBUTION_MODE, - DistributionMode.RANGE.modeName()); + DistributionMode.RANGE.modeName(), + TableProperties.DELETE_GRANULARITY, + DeleteGranularity.PARTITION.toString()); sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps)); createBranchIfNeeded(); @@ -437,7 +441,7 @@ public void testUpdateWithoutCondition() { validateProperty(currentSnapshot, DELETED_FILES_PROP, "3"); validateProperty(currentSnapshot, ADDED_FILES_PROP, ImmutableSet.of("2", "3")); } else { - validateMergeOnRead(currentSnapshot, "2", "2", "2"); + validateMergeOnRead(currentSnapshot, "2", "3", "2"); } assertEquals(