diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index 1a56a8d93c..e802e10297 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -212,8 +212,9 @@ public DatasetDescriptor getDatasetDescriptor(FileSystem fs) { * @param dstMetadata is null if destination {@link IcebergTable} is absent, in which case registration is skipped */ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dstMetadata) { if (dstMetadata != null) { - // use current destination metadata as 'base metadata' and source as 'updated metadata' while committing - this.tableOps.commit(dstMetadata, srcMetadata.replaceProperties(dstMetadata.properties())); + // Use current destination metadata as 'base metadata', but commit the source-side metadata + // to synchronize source-side property deletion over to the destination + this.tableOps.commit(dstMetadata, srcMetadata); } } } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java index 096962320a..a1a29444ed 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -35,6 +36,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; @@ -47,6 +49,7 @@ import org.testng.annotations.Test; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -181,6 +184,33 @@ public void testGetIncrementalSnapshotInfosIteratorRepeatedFiles() throws IOExce } } + /** Verify that registerIcebergTable will update existing table properties */ + @Test + public void testNewTablePropertiesAreRegistered() throws Exception { + Map srcTableProperties = Maps.newHashMap(); + Map destTableProperties = Maps.newHashMap(); + + srcTableProperties.put("newKey", "newValue"); + // Expect the old value to be overwritten by the new value + srcTableProperties.put("testKey", "testValueNew"); + destTableProperties.put("testKey", "testValueOld"); + // Expect existing property values to be deleted if it does not exist on the source + destTableProperties.put("deletedTableProperty", "deletedTablePropertyValue"); + + TableIdentifier destTableId = TableIdentifier.of(dbName, "destTable"); + catalog.createTable(destTableId, icebergSchema, null, destTableProperties); + + IcebergTable destIcebergTable = new IcebergTable(destTableId, catalog.newTableOps(destTableId), catalogUri); + // Mock a source table with the same table UUID copying new properties + TableMetadata newSourceTableProperties = destIcebergTable.accessTableMetadata().replaceProperties(srcTableProperties); + + destIcebergTable.registerIcebergTable(newSourceTableProperties, destIcebergTable.accessTableMetadata()); + Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().size(), 2); + Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().get("newKey"), "newValue"); + Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().get("testKey"), "testValueNew"); + Assert.assertNull(destIcebergTable.accessTableMetadata().properties().get("deletedTableProperty")); + } + /** full validation for a particular {@link IcebergSnapshotInfo} */ protected void verifySnapshotInfo(IcebergSnapshotInfo snapshotInfo, List> perSnapshotFilesets, int overallNumSnapshots) { // verify metadata file