Skip to content

Commit

Permalink
Flink: Backport #10200 to v1.19 and v1.17 (#10259)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvary authored May 1, 2024
1 parent aeb2682 commit 0323308
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.RowDataProjection;
Expand Down Expand Up @@ -109,7 +110,7 @@ public void write(RowData row) throws IOException {

protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
RowDataDeltaWriter(PartitionKey partition) {
super(partition, schema, deleteSchema);
super(partition, schema, deleteSchema, DeleteGranularity.FILE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
*/
package org.apache.iceberg.flink.sink;

import static org.assertj.core.api.Assumptions.assumeThat;

import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
Expand All @@ -45,6 +50,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

Expand All @@ -61,6 +67,8 @@ public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
public final HadoopCatalogResource catalogResource =
new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);

@Rule public final Timeout globalTimeout = Timeout.seconds(60);

@Parameterized.Parameters(
name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
public static Object[][] parameters() {
Expand Down Expand Up @@ -233,4 +241,31 @@ public void testUpsertOnDataKey() throws Exception {
public void testUpsertOnIdDataKey() throws Exception {
testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH);
}

@Test
public void testDeleteStats() throws Exception {
assumeThat(format).isNotEqualTo(FileFormat.AVRO);

List<List<Row>> elementsPerCheckpoint =
ImmutableList.of(
// Checkpoint #1
ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")));

List<List<Record>> expectedRecords = ImmutableList.of(ImmutableList.of(record(1, "aaa")));

testChangeLogs(
ImmutableList.of("id", "data"),
row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
false,
elementsPerCheckpoint,
expectedRecords,
"main");

DeleteFile deleteFile = table.currentSnapshot().addedDeleteFiles(table.io()).iterator().next();
String fromStat =
new String(
deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array());
DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
assumeThat(fromStat).isEqualTo(dataFile.path().toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.RowDataProjection;
Expand Down Expand Up @@ -109,7 +110,7 @@ public void write(RowData row) throws IOException {

protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
RowDataDeltaWriter(PartitionKey partition) {
super(partition, schema, deleteSchema);
super(partition, schema, deleteSchema, DeleteGranularity.FILE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
*/
package org.apache.iceberg.flink.sink;

import static org.assertj.core.api.Assumptions.assumeThat;

import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.TableProperties;
Expand All @@ -45,6 +50,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

Expand All @@ -61,6 +67,8 @@ public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
public final HadoopCatalogResource catalogResource =
new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);

@Rule public final Timeout globalTimeout = Timeout.seconds(60);

@Parameterized.Parameters(
name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
public static Object[][] parameters() {
Expand Down Expand Up @@ -233,4 +241,31 @@ public void testUpsertOnDataKey() throws Exception {
public void testUpsertOnIdDataKey() throws Exception {
testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH);
}

@Test
public void testDeleteStats() throws Exception {
assumeThat(format).isNotEqualTo(FileFormat.AVRO);

List<List<Row>> elementsPerCheckpoint =
ImmutableList.of(
// Checkpoint #1
ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")));

List<List<Record>> expectedRecords = ImmutableList.of(ImmutableList.of(record(1, "aaa")));

testChangeLogs(
ImmutableList.of("id", "data"),
row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
false,
elementsPerCheckpoint,
expectedRecords,
"main");

DeleteFile deleteFile = table.currentSnapshot().addedDeleteFiles(table.io()).iterator().next();
String fromStat =
new String(
deleteFile.lowerBounds().get(MetadataColumns.DELETE_FILE_PATH.fieldId()).array());
DataFile dataFile = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
assumeThat(fromStat).isEqualTo(dataFile.path().toString());
}
}

0 comments on commit 0323308

Please sign in to comment.