Skip to content

Commit

Permalink
Core: Change delete file granularity to file
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Nov 6, 2024
1 parent ad24d4b commit e4e1f0f
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 26 deletions.
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ private static Map<String, String> persistedProperties(Map<String, String> rawPr
persistedProperties.put(
TableProperties.PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
persistedProperties.put(
TableProperties.DELETE_GRANULARITY, TableProperties.DELETE_GRANULARITY_DEFAULT_SINCE_1_8_0);

rawProperties.entrySet().stream()
.filter(entry -> !TableProperties.RESERVED_PROPERTIES.contains(entry.getKey()))
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ private TableProperties() {}

public static final String DELETE_GRANULARITY = "write.delete.granularity";
public static final String DELETE_GRANULARITY_DEFAULT = DeleteGranularity.PARTITION.toString();
public static final String DELETE_GRANULARITY_DEFAULT_SINCE_1_8_0 =
DeleteGranularity.FILE.toString();

public static final String DELETE_ISOLATION_LEVEL = "write.delete.isolation-level";
public static final String DELETE_ISOLATION_LEVEL_DEFAULT = "serializable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ public void testDeleteWithVectorizedReads() throws NoSuchTableException {
sql("SELECT * FROM %s ORDER BY id ASC", selectTarget()));
}

@Test
public void testCoalesceDelete() throws Exception {
@TestTemplate
public void testCoalesceDeleteWithPartitionGranularity() throws Exception {
createAndInitUnpartitionedTable();

Employee[] employees = new Employee[100];
Expand All @@ -168,24 +168,34 @@ public void testCoalesceDelete() throws Exception {

// set the open file cost large enough to produce a separate scan task per file
// use range distribution to trigger a shuffle
// set partitioned scoped deletes so that 1 delete file is written as part of the output task
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
DELETE_DISTRIBUTION_MODE,
DistributionMode.RANGE.modeName());
DistributionMode.RANGE.modeName(),
TableProperties.DELETE_GRANULARITY,
DeleteGranularity.PARTITION.toString());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();

// enable AQE and set the advisory partition size big enough to trigger combining
// set the number of shuffle partitions to 200 to distribute the work across reducers
// set the advisory partition size for shuffles small enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"100",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
String.valueOf(256 * 1024 * 1024)),
() -> {
SparkPlan plan =
executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", commitTarget());
Expand All @@ -210,8 +220,9 @@ public void testCoalesceDelete() throws Exception {
validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "1");
}

Assert.assertEquals(
"Row count must match", 200L, scalarSql("SELECT COUNT(*) FROM %s", commitTarget()));
assertThat(scalarSql("SELECT COUNT(*) FROM %s", commitTarget()))
.as("Row count must match")
.isEqualTo(200L);
}

@Test
Expand Down Expand Up @@ -1256,7 +1267,7 @@ public void testDeleteRefreshesRelationCache() throws NoSuchTableException {
spark.sql("UNCACHE TABLE tmp");
}

@Test
@TestTemplate
public void testDeleteWithMultipleSpecs() {
createAndInitTable("id INT, dep STRING, category STRING");

Expand All @@ -1282,13 +1293,13 @@ public void testDeleteWithMultipleSpecs() {
sql("DELETE FROM %s WHERE id IN (1, 3, 5, 7)", commitTarget());

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 5 snapshots", 5, Iterables.size(table.snapshots()));
assertThat(table.snapshots()).as("Should have 5 snapshots").hasSize(5);

Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
if (mode(table) == COPY_ON_WRITE) {
validateCopyOnWrite(currentSnapshot, "3", "4", "1");
} else {
validateMergeOnRead(currentSnapshot, "3", "3", null);
validateMergeOnRead(currentSnapshot, "3", "4", null);
}

assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ public void testUpdateWithVectorizedReads() {
sql("SELECT * FROM %s ORDER BY id", selectTarget()));
}

@Test
public void testCoalesceUpdate() {
@TestTemplate
public void testCoalesceUpdateWithPartitionGranularityDeletes() {
createAndInitTable("id INT, dep STRING");

String[] records = new String[100];
Expand All @@ -149,24 +149,34 @@ public void testCoalesceUpdate() {

// set the open file cost large enough to produce a separate scan task per file
// use range distribution to trigger a shuffle
// set partitioned scoped deletes so that 1 delete file is written as part of the output task
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
UPDATE_DISTRIBUTION_MODE,
DistributionMode.RANGE.modeName());
DistributionMode.RANGE.modeName(),
TableProperties.DELETE_GRANULARITY,
DeleteGranularity.PARTITION.toString());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();

// enable AQE and set the advisory partition size big enough to trigger combining
// set the number of shuffle partitions to 200 to distribute the work across reducers
// set the advisory partition size for shuffles small enough to ensure writes override it
withSQLConf(
ImmutableMap.of(
SQLConf.SHUFFLE_PARTITIONS().key(), "200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(), "true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "256MB"),
SQLConf.SHUFFLE_PARTITIONS().key(),
"200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
"true",
SQLConf.COALESCE_PARTITIONS_ENABLED().key(),
"true",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(),
"100",
SparkSQLProperties.ADVISORY_PARTITION_SIZE,
String.valueOf(256 * 1024 * 1024)),
() -> {
SparkPlan plan =
executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", commitTarget());
Expand Down Expand Up @@ -444,7 +454,7 @@ public void testUpdateWithoutCondition() {
validateProperty(currentSnapshot, DELETED_FILES_PROP, "3");
validateProperty(currentSnapshot, ADDED_FILES_PROP, ImmutableSet.of("2", "3"));
} else {
validateMergeOnRead(currentSnapshot, "2", "2", "2");
validateMergeOnRead(currentSnapshot, "2", "3", "2");
}

assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -133,7 +134,7 @@ public void testDeleteWithVectorizedReads() throws NoSuchTableException {
}

@TestTemplate
public void testCoalesceDelete() throws Exception {
public void testCoalesceDeleteWithPartitionGranularity() throws Exception {
createAndInitUnpartitionedTable();

Employee[] employees = new Employee[100];
Expand All @@ -147,12 +148,15 @@ public void testCoalesceDelete() throws Exception {

// set the open file cost large enough to produce a separate scan task per file
// use range distribution to trigger a shuffle
// set partitioned scoped deletes so that 1 delete file is written as part of the output task
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
DELETE_DISTRIBUTION_MODE,
DistributionMode.RANGE.modeName());
DistributionMode.RANGE.modeName(),
TableProperties.DELETE_GRANULARITY,
DeleteGranularity.PARTITION.toString());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();
Expand Down Expand Up @@ -1293,7 +1297,7 @@ public void testDeleteWithMultipleSpecs() {
if (mode(table) == COPY_ON_WRITE) {
validateCopyOnWrite(currentSnapshot, "3", "4", "1");
} else {
validateMergeOnRead(currentSnapshot, "3", "3", null);
validateMergeOnRead(currentSnapshot, "3", "4", null);
}

assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void testUpdateWithVectorizedReads() {
}

@TestTemplate
public void testCoalesceUpdate() {
public void testCoalesceUpdateWithPartitionGranularityDeletes() {
createAndInitTable("id INT, dep STRING");

String[] records = new String[100];
Expand All @@ -130,12 +131,15 @@ public void testCoalesceUpdate() {

// set the open file cost large enough to produce a separate scan task per file
// use range distribution to trigger a shuffle
// set partitioned scoped deletes so that 1 delete file is written as part of the output task
Map<String, String> tableProps =
ImmutableMap.of(
SPLIT_OPEN_FILE_COST,
String.valueOf(Integer.MAX_VALUE),
UPDATE_DISTRIBUTION_MODE,
DistributionMode.RANGE.modeName());
DistributionMode.RANGE.modeName(),
TableProperties.DELETE_GRANULARITY,
DeleteGranularity.PARTITION.toString());
sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, tablePropsAsString(tableProps));

createBranchIfNeeded();
Expand Down Expand Up @@ -437,7 +441,7 @@ public void testUpdateWithoutCondition() {
validateProperty(currentSnapshot, DELETED_FILES_PROP, "3");
validateProperty(currentSnapshot, ADDED_FILES_PROP, ImmutableSet.of("2", "3"));
} else {
validateMergeOnRead(currentSnapshot, "2", "2", "2");
validateMergeOnRead(currentSnapshot, "2", "3", "2");
}

assertEquals(
Expand Down

0 comments on commit e4e1f0f

Please sign in to comment.