diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index 40e0b5f2a34e..e8a46c5becd7 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -27,6 +27,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataWrapper; import org.apache.iceberg.flink.data.RowDataProjection; @@ -109,7 +110,7 @@ public void write(RowData row) throws IOException { protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter { RowDataDeltaWriter(PartitionKey partition) { - super(partition, schema, deleteSchema); + super(partition, schema, deleteSchema, DeleteGranularity.FILE); } @Override diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index b5c3bcf41734..5e81c279b69b 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -18,12 +18,17 @@ */ package org.apache.iceberg.flink.sink; +import static org.assertj.core.api.Assumptions.assumeThat; + import java.util.List; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.TableProperties; @@ -45,6 +50,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -61,6 +67,8 @@ public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base { public final HadoopCatalogResource catalogResource = new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE); + @Rule public final Timeout globalTimeout = Timeout.seconds(60); + @Parameterized.Parameters( name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}") public static Object[][] parameters() { @@ -233,4 +241,31 @@ public void testUpsertOnDataKey() throws Exception { public void testUpsertOnIdDataKey() throws Exception { testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH); } + + @Test + public void testDeleteStats() throws Exception { + assumeThat(format).isNotEqualTo(FileFormat.AVRO); + + List> elementsPerCheckpoint = + ImmutableList.of( + // Checkpoint #1 + ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa"))); + + List> expectedRecords = ImmutableList.of(ImmutableList.of(record(1, "aaa"))); + + testChangeLogs( + ImmutableList.of("id", "data"), + row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + false, + elementsPerCheckpoint, + expectedRecords, + "main"); + + DeleteFile deleteFile = table.currentSnapshot().addedDeleteFiles(table.io()).iterator().next(); + String fromStat = + new String( + deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array()); + DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next(); + assumeThat(fromStat).isEqualTo(dataFile.path().toString()); + } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index 40e0b5f2a34e..e8a46c5becd7 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -27,6 +27,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataWrapper; import org.apache.iceberg.flink.data.RowDataProjection; @@ -109,7 +110,7 @@ public void write(RowData row) throws IOException { protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter { RowDataDeltaWriter(PartitionKey partition) { - super(partition, schema, deleteSchema); + super(partition, schema, deleteSchema, DeleteGranularity.FILE); } @Override diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java index b5c3bcf41734..5e81c279b69b 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java @@ -18,12 +18,17 @@ */ package org.apache.iceberg.flink.sink; +import static org.assertj.core.api.Assumptions.assumeThat; + import java.util.List; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Row; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.TableProperties; @@ -45,6 +50,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -61,6 +67,8 @@ public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base { public final HadoopCatalogResource catalogResource = new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE); + @Rule public final Timeout globalTimeout = Timeout.seconds(60); + @Parameterized.Parameters( name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}") public static Object[][] parameters() { @@ -233,4 +241,31 @@ public void testUpsertOnDataKey() throws Exception { public void testUpsertOnIdDataKey() throws Exception { testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH); } + + @Test + public void testDeleteStats() throws Exception { + assumeThat(format).isNotEqualTo(FileFormat.AVRO); + + List> elementsPerCheckpoint = + ImmutableList.of( + // Checkpoint #1 + ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa"))); + + List> expectedRecords = ImmutableList.of(ImmutableList.of(record(1, "aaa"))); + + testChangeLogs( + ImmutableList.of("id", "data"), + row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)), + false, + elementsPerCheckpoint, + expectedRecords, + "main"); + + DeleteFile deleteFile = table.currentSnapshot().addedDeleteFiles(table.io()).iterator().next(); + String fromStat = + new String( + deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array()); + DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next(); + assumeThat(fromStat).isEqualTo(dataFile.path().toString()); + } }