diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java index 9e2ae53b99..9e9e2309fe 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -43,7 +44,10 @@ protected BaseIcebergCatalog(String catalogName, Class compan @Override public IcebergTable openTable(String dbName, String tableName) { TableIdentifier tableId = TableIdentifier.of(dbName, tableName); - return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(), createTableOperations(tableId), this.getCatalogUri()); + return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(), + createTableOperations(tableId), + this.getCatalogUri(), + loadTableInstance(tableId)); } protected Catalog createCompanionCatalog(Map properties, Configuration configuration) { @@ -67,4 +71,6 @@ protected String getDatasetDescriptorPlatform() { } protected abstract TableOperations createTableOperations(TableIdentifier tableId); + + protected abstract Table loadTableInstance(TableIdentifier tableId); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index f6668f5d18..39c1e7ad1c 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -85,7 +85,7 @@ public String getConfigPrefix() { } protected final FileSystem sourceFs; - private final Properties properties; + protected final Properties properties; /** * Finds all {@link IcebergDataset}s in the file system using the Iceberg Catalog. @@ -153,7 +153,7 @@ protected IcebergDataset createIcebergDataset(IcebergCatalog sourceIcebergCatalo IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(destDbName, destTableName); // TODO: Rethink strategy to enforce dest iceberg table Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName, destTableName)); - return new IcebergDataset(srcIcebergTable, destIcebergTable, properties, fs, getConfigShouldCopyMetadataPath(properties)); + return createSpecificDataset(srcIcebergTable, destIcebergTable, properties, fs, getConfigShouldCopyMetadataPath(properties)); } protected static IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException { @@ -165,6 +165,11 @@ protected static IcebergCatalog createIcebergCatalog(Properties properties, Cata return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration); } + protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) + throws IcebergTable.TableNotFoundException { + return new IcebergDataset(srcIcebergTable, destIcebergTable, properties, fs, shouldIncludeMetadataPath); + } + protected static boolean getConfigShouldCopyMetadataPath(Properties properties) { return Boolean.valueOf(properties.getProperty(ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH, DEFAULT_ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH)); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java index af541a79a5..27ea723df5 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveCatalog; @@ -61,4 +62,9 @@ protected TableOperations createTableOperations(TableIdentifier tableId) { public boolean tableAlreadyExists(IcebergTable icebergTable) { return hc.tableExists(icebergTable.getTableId()); } + + @Override + protected Table loadTableInstance(TableIdentifier tableId) { + return hc.loadTable(tableId); + } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java new file mode 100644 index 0000000000..968b6fcce9 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.util.SerializationUtil; + +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.github.rholder.retry.Retryer; +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.commit.CommitStep; +import org.apache.gobblin.util.retry.RetryerFactory; + +import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS; +import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES; +import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE; +import static org.apache.gobblin.util.retry.RetryerFactory.RetryType; + +/** + * Commit step for overwriting partitions in an Iceberg table. + *

+ * This class implements the {@link CommitStep} interface and provides functionality to overwrite + * partitions in the destination Iceberg table using serialized data files. + *

+ */ +@Slf4j +public class IcebergOverwritePartitionsStep implements CommitStep { + private final String destTableIdStr; + private final Properties properties; + private final byte[] serializedDataFiles; + private final String partitionColName; + private final String partitionValue; + public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + + ".catalog.overwrite.partitions.retries"; + private static final Config RETRYER_FALLBACK_CONFIG = ConfigFactory.parseMap(ImmutableMap.of( + RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L), + RETRY_TIMES, 3, + RETRY_TYPE, RetryType.FIXED_ATTEMPT.name())); + + /** + * Constructs an {@code IcebergReplacePartitionsStep} with the specified parameters. + * + * @param destTableIdStr the identifier of the destination table as a string + * @param serializedDataFiles the serialized data files to be used for replacing partitions + * @param properties the properties containing configuration + */ + public IcebergOverwritePartitionsStep(String destTableIdStr, String partitionColName, String partitionValue, byte[] serializedDataFiles, Properties properties) { + this.destTableIdStr = destTableIdStr; + this.partitionColName = partitionColName; + this.partitionValue = partitionValue; + this.serializedDataFiles = serializedDataFiles; + this.properties = properties; + } + + @Override + public boolean isCompleted() { + return false; + } + + /** + * Executes the partition replacement in the destination Iceberg table. + * Also, have retry mechanism as done in {@link IcebergRegisterStep#execute()} + * + * @throws IOException if an I/O error occurs during execution + */ + @Override + public void execute() throws IOException { + IcebergTable destTable = createDestinationCatalog().openTable(TableIdentifier.parse(this.destTableIdStr)); + List dataFiles = SerializationUtil.deserializeFromBytes(this.serializedDataFiles); + try { + log.info("Overwriting Data files of partition {} with value {} for destination table : {} ", + this.partitionColName, + this.partitionValue, + this.destTableIdStr + ); + Retryer overwritePartitionsRetryer = createOverwritePartitionsRetryer(); + overwritePartitionsRetryer.call(() -> { + destTable.overwritePartitions(dataFiles, this.partitionColName, this.partitionValue); + return null; + }); + log.info("Overwriting Data files completed for partition {} with value {} for destination table : {} ", + this.partitionColName, + this.partitionValue, + this.destTableIdStr + ); + } catch (ExecutionException executionException) { + String msg = String.format("Failed to overwrite partitions for destination iceberg table : {%s}", this.destTableIdStr); + log.error(msg, executionException); + throw new RuntimeException(msg, executionException.getCause()); + } catch (RetryException retryException) { + String interruptedNote = Thread.currentThread().isInterrupted() ? "... then interrupted" : ""; + String msg = String.format("Failed to overwrite partition for destination table : {%s} : (retried %d times) %s ", + this.destTableIdStr, + retryException.getNumberOfFailedAttempts(), + interruptedNote); + Throwable informativeException = retryException.getLastFailedAttempt().hasException() + ? retryException.getLastFailedAttempt().getExceptionCause() + : retryException; + log.error(msg, informativeException); + throw new RuntimeException(msg, informativeException); + } + } + + protected IcebergCatalog createDestinationCatalog() throws IOException { + return IcebergDatasetFinder.createIcebergCatalog(this.properties, IcebergDatasetFinder.CatalogLocation.DESTINATION); + } + + private Retryer createOverwritePartitionsRetryer() { + Config config = ConfigFactory.parseProperties(this.properties); + Config retryerOverridesConfig = config.hasPath(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX) + ? config.getConfig(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX) + : ConfigFactory.empty(); + + return RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG), Optional.of(new RetryListener() { + @Override + public void onRetry(Attempt attempt) { + if (attempt.hasException()) { + String msg = String.format("Exception caught while overwriting partitions for destination table : {%s} : [attempt: %d; %s after start]", + destTableIdStr, + attempt.getAttemptNumber(), + Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString()); + log.warn(msg, attempt.getExceptionCause()); + } + } + })); + } +} \ No newline at end of file diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java new file mode 100644 index 0000000000..1f1e5feba4 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.function.Predicate; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.util.SerializationUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.ImmutableList; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.CopyableDataset; +import org.apache.gobblin.util.measurement.GrowthMilestoneTracker; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil; + +/** + * Iceberg Partition dataset implementing {@link CopyableDataset} + *

+ * This class extends {@link IcebergDataset} and provides functionality to filter partitions + * and generate copy entities for partition based data movement. + *

+ */ +@Slf4j +public class IcebergPartitionDataset extends IcebergDataset { + private static final List supportedTransforms = ImmutableList.of("identity", "truncate"); + private final Predicate partitionFilterPredicate; + private final Map srcPathToDestPath; + private final String partitionColumnName; + private final String partitionColValue; + + public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, + FileSystem sourceFs, boolean shouldIncludeMetadataPath, String partitionColumnName, String partitionColValue) + throws IcebergTable.TableNotFoundException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath); + this.partitionColumnName = partitionColumnName; + this.partitionColValue = partitionColValue; + this.partitionFilterPredicate = createPartitionFilterPredicate(); + this.srcPathToDestPath = new HashMap<>(); + } + + private Predicate createPartitionFilterPredicate() throws IcebergTable.TableNotFoundException { + //TODO: Refactor it later using factory or other way to support different types of filter predicate + // Also take into consideration creation of Expression Filter to be used in overwrite api + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + int partitionColumnIndex = IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex( + this.partitionColumnName, + srcTableMetadata, + supportedTransforms + ); + return new IcebergMatchesAnyPropNamePartitionFilterPredicate(partitionColumnIndex, this.partitionColValue); + } + + /** + * Represents the destination file paths and the corresponding file status in source file system. + * These both properties are used in creating {@link CopyEntity} + */ + @Data + protected static final class FilePathsWithStatus { + private final Path destPath; + private final FileStatus srcFileStatus; + } + + /** + * Generates copy entities for partition based data movement. + * It finds files specific to the partition and create destination data files based on the source data files. + * Also updates the destination data files with destination table write data location and add UUID to the file path + * to avoid conflicts. + * + * @param targetFs the target file system + * @param copyConfig the copy configuration + * @return a collection of copy entities + * @throws IOException if an I/O error occurs + */ + @Override + Collection generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { + String fileSet = this.getFileSetId(); + List copyEntities = Lists.newArrayList(); + IcebergTable srcIcebergTable = getSrcIcebergTable(); + List srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate); + List destDataFiles = getDestDataFiles(srcDataFiles); + Configuration defaultHadoopConfiguration = new Configuration(); + + for (FilePathsWithStatus filePathsWithStatus : getFilePathsStatus(this.sourceFs)) { + Path destPath = filePathsWithStatus.getDestPath(); + FileStatus srcFileStatus = filePathsWithStatus.getSrcFileStatus(); + FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration); + + CopyableFile fileEntity = CopyableFile.fromOriginAndDestination( + actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), copyConfig) + .fileSet(fileSet) + .datasetOutputPath(targetFs.getUri().getPath()) + .build(); + + fileEntity.setSourceData(getSourceDataset(this.sourceFs)); + fileEntity.setDestinationData(getDestinationDataset(targetFs)); + copyEntities.add(fileEntity); + } + + // Adding this check to avoid adding post publish step when there are no files to copy. + if (CollectionUtils.isNotEmpty(destDataFiles)) { + copyEntities.add(createOverwritePostPublishStep(destDataFiles)); + } + + log.info("~{}~ generated {} copy--entities", fileSet, copyEntities.size()); + return copyEntities; + } + + private List getDestDataFiles(List srcDataFiles) throws IcebergTable.TableNotFoundException { + List destDataFiles = new ArrayList<>(); + if (srcDataFiles.isEmpty()) { + return destDataFiles; + } + TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata(); + TableMetadata destTableMetadata = getDestIcebergTable().accessTableMetadata(); + PartitionSpec partitionSpec = destTableMetadata.spec(); + String srcWriteDataLocation = srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, ""); + String destWriteDataLocation = destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, ""); + if (StringUtils.isEmpty(srcWriteDataLocation) || StringUtils.isEmpty(destWriteDataLocation)) { + log.warn( + "Either source or destination table does not have write data location : source table write data location : {} , destination table write data location : {}", + srcWriteDataLocation, + destWriteDataLocation + ); + } + // tableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "") returns null if the property is not set and + // doesn't respect passed default value, so to avoid NPE in .replace() we are setting it to empty string. + String prefixToBeReplaced = (srcWriteDataLocation != null) ? srcWriteDataLocation : ""; + String prefixToReplaceWith = (destWriteDataLocation != null) ? destWriteDataLocation : ""; + GrowthMilestoneTracker growthMilestoneTracker = new GrowthMilestoneTracker(); + srcDataFiles.forEach(dataFile -> { + String srcFilePath = dataFile.path().toString(); + Path updatedDestFilePath = relocateDestPath(srcFilePath, prefixToBeReplaced, prefixToReplaceWith); + destDataFiles.add(DataFiles.builder(partitionSpec) + .copy(dataFile) + .withPath(updatedDestFilePath.toString()) + .build()); + // Store the mapping of srcPath to destPath to be used in creating list of src file status to dest path + srcPathToDestPath.put(new Path(srcFilePath), updatedDestFilePath); + if (growthMilestoneTracker.isAnotherMilestone(destDataFiles.size())) { + log.info("Generated {} destination data files", destDataFiles.size()); + } + }); + log.info("Generated {} destination data files", destDataFiles.size()); + return destDataFiles; + } + + private Path relocateDestPath(String curPathStr, String prefixToBeReplaced, String prefixToReplaceWith) { + String updPathStr = curPathStr.replace(prefixToBeReplaced, prefixToReplaceWith); + return addUUIDToPath(updPathStr); + } + + private Path addUUIDToPath(String filePathStr) { + Path filePath = new Path(filePathStr); + String fileDir = filePath.getParent().toString(); + String fileName = filePath.getName(); + String newFileName = String.join("-",UUID.randomUUID().toString(), fileName); + return new Path(fileDir, newFileName); + } + + private List getFilePathsStatus(FileSystem fs) throws IOException { + List filePathsStatus = new ArrayList<>(); + for (Map.Entry entry : this.srcPathToDestPath.entrySet()) { + Path srcPath = entry.getKey(); + Path destPath = entry.getValue(); + FileStatus srcFileStatus = fs.getFileStatus(srcPath); + filePathsStatus.add(new FilePathsWithStatus(destPath, srcFileStatus)); + } + return filePathsStatus; + } + + private PostPublishStep createOverwritePostPublishStep(List destDataFiles) { + byte[] serializedDataFiles = SerializationUtil.serializeToBytes(destDataFiles); + + IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new IcebergOverwritePartitionsStep( + this.getDestIcebergTable().getTableId().toString(), + this.partitionColumnName, + this.partitionColValue, + serializedDataFiles, + this.properties + ); + + return new PostPublishStep(this.getFileSetId(), Maps.newHashMap(), icebergOverwritePartitionStep, 0); + } + +} \ No newline at end of file diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java new file mode 100644 index 0000000000..b2ac1fc1bc --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.util.Properties; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; + +import com.google.common.base.Preconditions; + +import lombok.extern.slf4j.Slf4j; + +/** + * Finder class for locating and creating partitioned Iceberg datasets. + *

+ * This class extends {@link IcebergDatasetFinder} and provides functionality to create + * {@link IcebergPartitionDataset} instances based on the specified source and destination Iceberg catalogs. + *

+ */ +@Slf4j +public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder { + public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name"; + public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value"; + + public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties properties) { + super(sourceFs, properties); + } + + @Override + protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, + Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) throws IcebergTable.TableNotFoundException { +// TODO: Add Validator for source and destination tables later +// TableMetadata srcTableMetadata = srcIcebergTable.accessTableMetadata(); +// TableMetadata destTableMetadata = destIcebergTable.accessTableMetadata(); +// IcebergTableMetadataValidator.validateSourceAndDestinationTablesMetadata(srcTableMetadata, destTableMetadata); + + String partitionColumnName = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, + ICEBERG_PARTITION_NAME_KEY); + Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName), + "Partition column name cannot be empty"); + + String partitionColumnValue = getLocationQualifiedProperty(properties, IcebergDatasetFinder.CatalogLocation.SOURCE, + ICEBERG_PARTITION_VALUE_KEY); + Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnValue), + "Partition value cannot be empty"); + + return new IcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, fs, + getConfigShouldCopyMetadataPath(properties), partitionColumnName, partitionColumnValue); + } +} \ No newline at end of file diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index e802e10297..1c4bfdfbb2 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -20,20 +20,29 @@ import java.io.IOException; import java.net.URI; import java.time.Instant; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; +import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestReader; +import org.apache.iceberg.OverwriteFiles; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import com.google.common.annotations.VisibleForTesting; @@ -77,10 +86,16 @@ public TableNotFoundException(TableIdentifier tableId) { private final String datasetDescriptorPlatform; private final TableOperations tableOps; private final String catalogUri; + private final Table table; @VisibleForTesting IcebergTable(TableIdentifier tableId, TableOperations tableOps, String catalogUri) { - this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG, tableOps, catalogUri); + this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG, tableOps, catalogUri, null); + } + + @VisibleForTesting + IcebergTable(TableIdentifier tableId, TableOperations tableOps, String catalogUri, Table table) { + this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG, tableOps, catalogUri, table); } /** @return metadata info limited to the most recent (current) snapshot */ @@ -217,4 +232,56 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst this.tableOps.commit(dstMetadata, srcMetadata); } } + + /** + * Retrieves a list of data files from the current snapshot that match the specified partition filter predicate. + * + * @param icebergPartitionFilterPredicate the predicate to filter partitions + * @return a list of data files that match the partition filter predicate + * @throws IOException if an I/O error occurs while accessing the table metadata or reading manifest files + */ + public List getPartitionSpecificDataFiles(Predicate icebergPartitionFilterPredicate) throws IOException { + TableMetadata tableMetadata = accessTableMetadata(); + Snapshot currentSnapshot = tableMetadata.currentSnapshot(); + log.info("Starting to copy data files from snapshot: {}", currentSnapshot.snapshotId()); + //TODO: Add support for deleteManifests as well later + // Currently supporting dataManifests only + List dataManifestFiles = currentSnapshot.dataManifests(this.tableOps.io()); + List dataFileList = new ArrayList<>(); + for (ManifestFile manifestFile : dataManifestFiles) { + try (ManifestReader manifestReader = ManifestFiles.read(manifestFile, this.tableOps.io()); + CloseableIterator dataFiles = manifestReader.iterator()) { + dataFiles.forEachRemaining(dataFile -> { + if (icebergPartitionFilterPredicate.test(dataFile.partition())) { + dataFileList.add(dataFile.copy()); + } + }); + } catch (IOException e) { + log.warn("Failed to read manifest file: {} " , manifestFile.path(), e); + } + } + log.info("Found {} data files to copy", dataFileList.size()); + return dataFileList; + } + + /** + * Overwrite partitions in the table with the specified list of data files. + *

+ * Overwrite partition replaces the partitions using the expression filter provided. + *

+ * @param dataFiles the list of data files to replace partitions with + * @param partitionColName the partition column name whose data files are to be replaced + * @param partitionValue the partition column value on which data files will be replaced + */ + protected void overwritePartitions(List dataFiles, String partitionColName, String partitionValue) { + if (dataFiles.isEmpty()) { + return; + } + OverwriteFiles overwriteFiles = this.table.newOverwrite(); + overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName, partitionValue)); + dataFiles.forEach(overwriteFiles::addFile); + overwriteFiles.commit(); + this.tableOps.refresh(); + } + } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicate.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicate.java new file mode 100644 index 0000000000..d2167d3f66 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicate.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.Objects; +import java.util.function.Predicate; + +import org.apache.iceberg.StructLike; + +/** + * Predicate implementation for filtering Iceberg partitions based on specified partition values. + *

+ * This class filters partitions by checking if the partition value matches any of the specified values. + *

+ */ +public class IcebergMatchesAnyPropNamePartitionFilterPredicate implements Predicate { + private final int partitionColumnIndex; + private final String partitionValue; + + /** + * Constructs an {@code IcebergMatchesAnyPropNamePartitionFilterPredicate} with the specified parameters. + * + * @param partitionColumnIndex the index of the partition column in partition spec + * @param partitionValue the partition value to match + */ + public IcebergMatchesAnyPropNamePartitionFilterPredicate(int partitionColumnIndex, String partitionValue) { + this.partitionColumnIndex = partitionColumnIndex; + this.partitionValue = partitionValue; + } + + /** + * Check if the partition value matches any of the specified partition values. + * + * @param partition the partition to check + * @return {@code true} if the partition value matches any of the specified values, otherwise {@code false} + */ + @Override + public boolean test(StructLike partition) { + // Just a cautious check to avoid NPE, ideally partition shouldn't be null if table is partitioned + if (Objects.isNull(partition)) { + return false; + } + + Object partitionVal = partition.get(this.partitionColumnIndex, Object.class); + // Need this check to avoid NPE on partitionVal.toString() + if (Objects.isNull(partitionVal)) { + return false; + } + + return this.partitionValue.equals(partitionVal.toString()); + } +} \ No newline at end of file diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java new file mode 100644 index 0000000000..4bf78c122c --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.List; + +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.TableMetadata; + +/** + * Utility class for creating and managing partition filter predicates for Iceberg tables. + *

+ * This class provides methods to retrieve the index of a partition column in the table metadata + * and ensures that the partition transform is supported. + *

+ *

+ * Note: This class is not meant to be instantiated. + *

+ */ +public class IcebergPartitionFilterPredicateUtil { + private IcebergPartitionFilterPredicateUtil() { + } + + /** + * Retrieves the index of the partition column from the partition spec in the table metadata. + * + * @param partitionColumnName the name of the partition column to find + * @param tableMetadata the metadata of the Iceberg table + * @param supportedTransforms a list of supported partition transforms + * @return the index of the partition column if found, otherwise -1 + * @throws IllegalArgumentException if the partition transform is not supported + */ + public static int getPartitionColumnIndex( + String partitionColumnName, + TableMetadata tableMetadata, + List supportedTransforms + ) { + List partitionFields = tableMetadata.spec().fields(); + for (int idx = 0; idx < partitionFields.size(); idx++) { + PartitionField partitionField = partitionFields.get(idx); + if (partitionField.name().equals(partitionColumnName)) { + String transform = partitionField.transform().toString().toLowerCase(); + if (!supportedTransforms.contains(transform)) { + throw new IllegalArgumentException( + String.format(" For ~{%s:%d}~ Partition transform %s is not supported. Supported transforms are %s", + partitionColumnName, + idx, + transform, + supportedTransforms)); + } + return idx; + } + } + return -1; + } +} \ No newline at end of file diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java index b9babbc888..e92b913638 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.TableIdentifier; @@ -120,7 +121,10 @@ public void testGetDatasetDescriptor() throws URISyntaxException { TableIdentifier tableId = TableIdentifier.of(testDbName, testTblName); String qualifiedTableName = "foo_prefix." + tableId.toString(); String platformName = "Floe"; - IcebergTable table = new IcebergTable(tableId, qualifiedTableName, platformName, Mockito.mock(TableOperations.class), SRC_CATALOG_URI); + IcebergTable table = new IcebergTable(tableId, qualifiedTableName, platformName, + Mockito.mock(TableOperations.class), + SRC_CATALOG_URI, + Mockito.mock(Table.class)); FileSystem mockFs = Mockito.mock(FileSystem.class); Mockito.when(mockFs.getUri()).thenReturn(SRC_FS_URI); DatasetDescriptor expected = new DatasetDescriptor(platformName, URI.create(SRC_CATALOG_URI), qualifiedTableName); diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java new file mode 100644 index 0000000000..46b0f4e82c --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.util.SerializationUtil; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; + +import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES; + +/** Tests for {@link IcebergOverwritePartitionsStep} */ +public class IcebergOverwritePartitionsStepTest { + private final String destTableIdStr = "db.foo"; + private final String testPartitionColName = "testPartition"; + private final String testPartitionColValue = "testValue"; + private IcebergTable mockIcebergTable; + private IcebergCatalog mockIcebergCatalog; + private Properties mockProperties; + private byte[] mockSerializedDataFiles; + private IcebergOverwritePartitionsStep mockIcebergOverwritePartitionsStep; + + @BeforeMethod + public void setUp() throws IOException { + mockIcebergTable = Mockito.mock(IcebergTable.class); + mockIcebergCatalog = Mockito.mock(IcebergCatalog.class); + mockProperties = new Properties(); + + List mockDataFiles = getDummyDataFiles(); + mockSerializedDataFiles = SerializationUtil.serializeToBytes(mockDataFiles); + + mockIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, + testPartitionColName, testPartitionColValue, mockSerializedDataFiles, mockProperties)); + + Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable); + Mockito.doReturn(mockIcebergCatalog).when(mockIcebergOverwritePartitionsStep).createDestinationCatalog(); + } + + private List getDummyDataFiles() { + DataFile dataFile1 = DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to//db/foo/data/datafile1.orc") + .withFileSizeInBytes(1234) + .withRecordCount(100) + .build(); + + DataFile dataFile2 = DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath("/path/to//db/foo/data/datafile2.orc") + .withFileSizeInBytes(9876) + .withRecordCount(50) + .build(); + + return ImmutableList.of(dataFile1, dataFile2); + } + + @Test + public void testIsCompleted() { + Assert.assertFalse(mockIcebergOverwritePartitionsStep.isCompleted()); + } + + @Test + public void testExecute() { + try { + Mockito.doNothing().when(mockIcebergTable).overwritePartitions(Mockito.anyList(), Mockito.anyString(), + Mockito.anyString()); + mockIcebergOverwritePartitionsStep.execute(); + Mockito.verify(mockIcebergTable, Mockito.times(1)).overwritePartitions(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + } catch (IOException e) { + Assert.fail(String.format("Unexpected IOException : %s", e)); + } + } + + @Test + public void testExecuteWithRetry() { + try { + // first call throw exception which will be retried and on second call nothing happens + Mockito.doThrow(new RuntimeException()).doNothing().when(mockIcebergTable).overwritePartitions(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + mockIcebergOverwritePartitionsStep.execute(); + Mockito.verify(mockIcebergTable, Mockito.times(2)).overwritePartitions(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + } catch (IOException e) { + Assert.fail(String.format("Unexpected IOException : %s", e)); + } + } + + @Test + public void testExecuteWithDefaultRetry() { + try { + // Always throw exception + Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartitions(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + mockIcebergOverwritePartitionsStep.execute(); + } catch (RuntimeException e) { + Mockito.verify(mockIcebergTable, Mockito.times(3)).overwritePartitions(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + assertRetryTimes(e, 3); + } catch (IOException e) { + Assert.fail(String.format("Unexpected IOException : %s", e)); + } + } + + /** Disabling this test to avoid interrupting thread */ + @Test(enabled = false) + public void testExecuteWithRetryAndInterrupt() { + // first call throw exception which will be retried and on second call nothing happens + Mockito.doThrow(new RuntimeException()).doNothing().when(mockIcebergTable).overwritePartitions(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + Thread.currentThread().interrupt(); + try { + mockIcebergOverwritePartitionsStep.execute(); + Assert.fail("Expected Runtime Exception to be thrown"); + } catch (RuntimeException e) { + Assert.assertTrue(e.getMessage().startsWith( + String.format("Failed to overwrite partition for destination table : {%s} : (retried 1 times) ... then interrupted ", destTableIdStr)), + e.getMessage()); + } catch (IOException e) { + Assert.fail("Expected Runtime Exception to be thrown"); + } + } + + @Test + public void testExecuteWithCustomRetryConfig() throws IOException { + int retryCount = 7; + mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX + "." + RETRY_TIMES, + Integer.toString(retryCount)); + mockIcebergOverwritePartitionsStep = Mockito.spy(new IcebergOverwritePartitionsStep(destTableIdStr, + testPartitionColName, testPartitionColValue, mockSerializedDataFiles, mockProperties)); + Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable); + Mockito.doReturn(mockIcebergCatalog).when(mockIcebergOverwritePartitionsStep).createDestinationCatalog(); + try { + // Always throw exception + Mockito.doThrow(new RuntimeException()).when(mockIcebergTable).overwritePartitions(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + mockIcebergOverwritePartitionsStep.execute(); + } catch (RuntimeException e) { + Mockito.verify(mockIcebergTable, Mockito.times(retryCount)).overwritePartitions(Mockito.anyList(), + Mockito.anyString(), Mockito.anyString()); + assertRetryTimes(e, retryCount); + } catch (IOException e) { + Assert.fail(String.format("Unexpected IOException : %s", e)); + } + } + + private void assertRetryTimes(RuntimeException re, Integer retryTimes) { + String msg = String.format("Failed to overwrite partition for destination table : {%s} : (retried %d times)", destTableIdStr, retryTimes); + Assert.assertTrue(re.getMessage().startsWith(msg), re.getMessage()); + } +} diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java new file mode 100644 index 0000000000..75e7af89a6 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.JsonObject; + +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyContext; +import org.apache.gobblin.data.management.copy.PreserveAttributes; +import org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil; +import org.apache.gobblin.dataset.DatasetDescriptor; + +import static org.mockito.ArgumentMatchers.any; + + +/** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.IcebergPartitionDataset} */ +public class IcebergPartitionDatasetTest { + private IcebergTable srcIcebergTable; + private IcebergTable destIcebergTable; + private TableMetadata srcTableMetadata; + private TableMetadata destTableMetadata; + private FileSystem sourceFs; + private FileSystem targetFs; + private IcebergPartitionDataset icebergPartitionDataset; + private MockedStatic icebergPartitionFilterPredicateUtil; + private static final String SRC_TEST_DB = "srcTestDB"; + private static final String SRC_TEST_TABLE = "srcTestTable"; + private static final String SRC_WRITE_LOCATION = SRC_TEST_DB + "/" + SRC_TEST_TABLE + "/data"; + private static final String DEST_TEST_DB = "destTestDB"; + private static final String DEST_TEST_TABLE = "destTestTable"; + private static final String DEST_WRITE_LOCATION = DEST_TEST_DB + "/" + DEST_TEST_TABLE + "/data"; + private static final String TEST_ICEBERG_PARTITION_COLUMN_NAME = "testPartition"; + private static final String TEST_ICEBERG_PARTITION_COLUMN_VALUE = "testValue"; + private final Properties copyConfigProperties = new Properties(); + private final Properties properties = new Properties(); + private List srcFilePaths; + + private static final URI SRC_FS_URI; + private static final URI DEST_FS_URI; + + static { + try { + SRC_FS_URI = new URI("abc", "the.source.org", "/", null); + DEST_FS_URI = new URI("xyz", "the.dest.org", "/", null); + } catch (URISyntaxException e) { + throw new RuntimeException("should not occur!", e); + } + } + + @BeforeMethod + public void setUp() throws Exception { + setupSrcFileSystem(); + setupDestFileSystem(); + + TableIdentifier tableIdentifier = TableIdentifier.of(SRC_TEST_DB, SRC_TEST_TABLE); + + srcIcebergTable = Mockito.mock(IcebergTable.class); + destIcebergTable = Mockito.mock(IcebergTable.class); + + srcTableMetadata = Mockito.mock(TableMetadata.class); + destTableMetadata = Mockito.mock(TableMetadata.class); + Mockito.when(destTableMetadata.spec()).thenReturn(Mockito.mock(PartitionSpec.class)); + + Mockito.when(srcIcebergTable.getTableId()).thenReturn(tableIdentifier); + Mockito.when(destIcebergTable.getTableId()).thenReturn(tableIdentifier); + Mockito.when(srcIcebergTable.accessTableMetadata()).thenReturn(srcTableMetadata); + Mockito.when(destIcebergTable.accessTableMetadata()).thenReturn(destTableMetadata); + Mockito.when(srcIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class)); + Mockito.when(destIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class)); + + icebergPartitionFilterPredicateUtil = Mockito.mockStatic(IcebergPartitionFilterPredicateUtil.class); + icebergPartitionFilterPredicateUtil + .when(() -> IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(), Mockito.any(), Mockito.any())) + .thenReturn(0); + + copyConfigProperties.setProperty("data.publisher.final.dir", "/test"); + srcFilePaths = new ArrayList<>(); + } + + @AfterMethod + public void cleanUp() { + srcFilePaths.clear(); + icebergPartitionFilterPredicateUtil.close(); + } + + private void setupSrcFileSystem() throws IOException { + sourceFs = Mockito.mock(FileSystem.class); + Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI); + Mockito.when(sourceFs.makeQualified(any(Path.class))) + .thenAnswer(invocation -> invocation.getArgument(0, Path.class).makeQualified(SRC_FS_URI, new Path("/"))); + Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenAnswer(invocation -> { + Path path = invocation.getArgument(0, Path.class); + Path qualifiedPath = sourceFs.makeQualified(path); + return getFileStatus(qualifiedPath); + }); + } + + private void setupDestFileSystem() throws IOException { + targetFs = Mockito.mock(FileSystem.class); + Mockito.when(targetFs.getUri()).thenReturn(DEST_FS_URI); + Mockito.when(targetFs.makeQualified(any(Path.class))) + .thenAnswer(invocation -> invocation.getArgument(0, Path.class).makeQualified(DEST_FS_URI, new Path("/"))); + // Since we are adding UUID to the file name for every file while creating destination path, + // so return file not found exception if trying to find file status on destination file system + Mockito.when(targetFs.getFileStatus(any(Path.class))).thenThrow(new FileNotFoundException()); + } + + @Test + public void testGenerateCopyEntities() throws IOException { + srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc"); + List srcDataFiles = getDataFiles(); + Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles); + + icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs, + true); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(targetFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); + + Collection copyEntities = icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration); + + Assert.assertEquals(copyEntities.size(), 2); + verifyCopyEntities(copyEntities, true); + } + + @Test + public void testGenerateCopyEntitiesWithEmptyDataFiles() throws IOException { + List srcDataFiles = Lists.newArrayList(); + Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles); + + icebergPartitionDataset = new IcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs, + true, TEST_ICEBERG_PARTITION_COLUMN_NAME, TEST_ICEBERG_PARTITION_COLUMN_VALUE); + Collection copyEntities = icebergPartitionDataset.generateCopyEntities(targetFs, + Mockito.mock(CopyConfiguration.class)); + + // Since No data files are present, no copy entities should be generated + Assert.assertEquals(copyEntities.size(), 0); + } + + @Test + public void testMultipleCopyEntitiesGenerated() throws IOException { + srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc"); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file2.orc"); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file3.orc"); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file4.orc"); + srcFilePaths.add(SRC_WRITE_LOCATION + "/file5.orc"); + + List srcDataFiles = getDataFiles(); + Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles); + + icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs, + true); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(targetFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); + + Collection copyEntities = icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration); + + Assert.assertEquals(copyEntities.size(), 6); + verifyCopyEntities(copyEntities, true); + } + + @Test + public void testWithDifferentSrcAndDestTableWriteLocation() throws IOException { + srcFilePaths.add(SRC_WRITE_LOCATION + "/randomFile--Name.orc"); + Mockito.when(srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "")).thenReturn(SRC_WRITE_LOCATION); + Mockito.when(destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "")).thenReturn(DEST_WRITE_LOCATION); + + List srcDataFiles = getDataFiles(); + Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles); + + icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, destIcebergTable, properties, sourceFs, + true); + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(targetFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) + .copyContext(new CopyContext()).build(); + + List copyEntities = + (List) icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration); + + Assert.assertEquals(copyEntities.size(), 2); + verifyCopyEntities(copyEntities, false); + } + + private List getDataFiles() throws IOException { + List dataFiles = new ArrayList<>(); + for (String srcFilePath : srcFilePaths) { + DataFile dataFile = Mockito.mock(DataFile.class); + Path dataFilePath = new Path(srcFilePath); + Path qualifiedPath = sourceFs.makeQualified(dataFilePath); + Mockito.when(dataFile.path()).thenReturn(dataFilePath.toString()); + Mockito.when(sourceFs.getFileStatus(Mockito.eq(dataFilePath))).thenReturn(getFileStatus(qualifiedPath)); + dataFiles.add(dataFile); + } + return dataFiles; + } + + private static FileStatus getFileStatus(Path path) { + FileStatus fileStatus = new FileStatus(); + fileStatus.setPath(path); + return fileStatus; + } + + private static void verifyCopyEntities(Collection copyEntities, boolean sameSrcAndDestWriteLocation) { + String srcWriteLocationStart = SRC_FS_URI + SRC_WRITE_LOCATION; + String destWriteLocationStart = DEST_FS_URI + (sameSrcAndDestWriteLocation ? SRC_WRITE_LOCATION : DEST_WRITE_LOCATION); + String srcErrorMsg = String.format("Source Location should start with %s", srcWriteLocationStart); + String destErrorMsg = String.format("Destination Location should start with %s", destWriteLocationStart); + for (CopyEntity copyEntity : copyEntities) { + String json = copyEntity.toString(); + if (isCopyableFile(json)) { + String originFilepath = getOriginFilePathAsStringFromJson(json); + String destFilepath = getDestinationFilePathAsStringFromJson(json); + Assert.assertTrue(originFilepath.startsWith(srcWriteLocationStart), srcErrorMsg); + Assert.assertTrue(destFilepath.startsWith(destWriteLocationStart), destErrorMsg); + String originFileName = originFilepath.substring(srcWriteLocationStart.length() + 1); + String destFileName = destFilepath.substring(destWriteLocationStart.length() + 1); + Assert.assertTrue(destFileName.endsWith(originFileName), "Incorrect file name in destination path"); + Assert.assertTrue(destFileName.length() > originFileName.length() + 1, + "Destination file name should be longer than source file name as UUID is appended"); + } else{ + verifyPostPublishStep(json); + } + } + } + + private static void verifyPostPublishStep(String json) { + String expectedCommitStep = "org.apache.gobblin.data.management.copy.iceberg.IcebergOverwritePartitionsStep"; + String actualCommitStep = new Gson().fromJson(json, JsonObject.class) + .getAsJsonObject("object-data") + .getAsJsonObject("step") + .getAsJsonPrimitive("object-type") + .getAsString(); + Assert.assertEquals(actualCommitStep, expectedCommitStep); + } + + private static boolean isCopyableFile(String json) { + String objectType = new Gson().fromJson(json, JsonObject.class) + .getAsJsonPrimitive("object-type") + .getAsString(); + return objectType.equals("org.apache.gobblin.data.management.copy.CopyableFile"); + } + + private static String getOriginFilePathAsStringFromJson(String json) { + return new Gson().fromJson(json, JsonObject.class) + .getAsJsonObject("object-data") + .getAsJsonObject("origin") + .getAsJsonObject("object-data").getAsJsonObject("path").getAsJsonObject("object-data") + .getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString(); + } + + private static String getDestinationFilePathAsStringFromJson(String json) { + return new Gson().fromJson(json, JsonObject.class) + .getAsJsonObject("object-data") + .getAsJsonObject("destination") + .getAsJsonObject("object-data") + .getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString(); + } + + /** + * See {@link org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetTest.TrickIcebergDataset} + * */ + protected static class TestIcebergPartitionDataset extends IcebergPartitionDataset { + + public TestIcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable destIcebergTable, + Properties properties, FileSystem sourceFs, boolean shouldIncludeMetadataPath) + throws IcebergTable.TableNotFoundException { + super(srcIcebergTable, destIcebergTable, properties, sourceFs, shouldIncludeMetadataPath, + TEST_ICEBERG_PARTITION_COLUMN_NAME, TEST_ICEBERG_PARTITION_COLUMN_VALUE); + } + + @Override + protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Configuration hadoopConfig) { + return this.sourceFs; + } + } +} diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java index a1a29444ed..4617011d74 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java @@ -21,8 +21,10 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -33,8 +35,11 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionData; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.avro.AvroSchemaUtil; @@ -61,7 +66,7 @@ public class IcebergTableTest extends HiveMetastoreTest { .fields() .name("id") .type() - .longType() + .stringType() .noDefault() .endRecord(); protected static final Schema icebergSchema = AvroSchemaUtil.toIceberg(avroDataSchema); @@ -333,4 +338,119 @@ protected static void verifyAnyOrder(Collection actual, Collection exp protected static > List flatten(Collection cc) { return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); } + + @Test(enabled = false) + public void testGetPartitionSpecificDataFiles() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file3.orc", + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file4.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, "1"); + List partitionDataList = Collections.nCopies(paths.size(), partitionData); + + addPartitionDataFiles(testTable, paths, partitionDataList); + + IcebergTable icebergTable = new IcebergTable(testTableId, + catalog.newTableOps(testTableId), + catalogUri, + catalog.loadTable(testTableId)); + // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate class + Predicate alwaysTruePredicate = partition -> true; + Predicate alwaysFalsePredicate = partition -> false; + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(), 5); + Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(), 0); + + catalog.dropTable(testTableId); + } + + @Test(enabled = false) + public void testOverwritePartitions() throws IOException { + TableIdentifier testTableId = TableIdentifier.of(dbName, "testTable"); + Table testTable = catalog.createTable(testTableId, icebergSchema, icebergPartitionSpec); + + List paths = Arrays.asList( + "/path/tableName/data/id=1/file1.orc", + "/path/tableName/data/id=1/file2.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData.set(0, "1"); + PartitionData partitionData2 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData2.set(0, "1"); + List partitionDataList = Arrays.asList(partitionData, partitionData2); + + addPartitionDataFiles(testTable, paths, partitionDataList); + + IcebergTable icebergTable = new IcebergTable(testTableId, + catalog.newTableOps(testTableId), + catalogUri, + catalog.loadTable(testTableId)); + + verifyAnyOrder(paths, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List paths2 = Arrays.asList( + "/path/tableName/data/id=2/file3.orc", + "/path/tableName/data/id=2/file4.orc" + ); + // Using the schema defined in start of this class + PartitionData partitionData3 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData3.set(0, "2"); + PartitionData partitionData4 = new PartitionData(icebergPartitionSpec.partitionType()); + partitionData4.set(0, "2"); + List partitionDataList2 = Arrays.asList(partitionData3, partitionData4); + + List dataFiles2 = getDataFiles(paths2, partitionDataList2); + // here, since partition data with value 2 doesn't exist yet, we expect it to get added to the table + icebergTable.overwritePartitions(dataFiles2, "id", "2"); + List expectedPaths2 = new ArrayList<>(paths); + expectedPaths2.addAll(paths2); + verifyAnyOrder(expectedPaths2, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + List paths3 = Arrays.asList( + "/path/tableName/data/id=1/file5.orc", + "/path/tableName/data/id=1/file6.orc" + ); + // Reusing same partition dats to create data file with different paths + List dataFiles3 = getDataFiles(paths3, partitionDataList); + // here, since partition data with value 1 already exists, we expect it to get updated in the table with newer path + icebergTable.overwritePartitions(dataFiles3, "id", "1"); + List expectedPaths3 = new ArrayList<>(paths2); + expectedPaths3.addAll(paths3); + verifyAnyOrder(expectedPaths3, icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths should match"); + + catalog.dropTable(testTableId); + } + + private static void addPartitionDataFiles(Table table, List paths, List partitionDataList) { + Assert.assertEquals(paths.size(), partitionDataList.size()); + getDataFiles(paths, partitionDataList).forEach(dataFile -> table.newAppend().appendFile(dataFile).commit()); + } + + private static List getDataFiles(List paths, List partitionDataList) { + Assert.assertEquals(paths.size(), partitionDataList.size()); + List dataFiles = Lists.newArrayList(); + for (int i = 0; i < paths.size(); i++) { + dataFiles.add(createDataFileWithPartition(paths.get(i), partitionDataList.get(i))); + } + return dataFiles; + } + + private static DataFile createDataFileWithPartition(String path, PartitionData partitionData) { + return DataFiles.builder(icebergPartitionSpec) + .withPath(path) + .withFileSizeInBytes(8) + .withRecordCount(1) + .withPartition(partitionData) + .withFormat(FileFormat.ORC) + .build(); + } + } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicateTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicateTest.java new file mode 100644 index 0000000000..4eb16500e6 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicateTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import org.apache.iceberg.StructLike; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate} */ +public class IcebergMatchesAnyPropNamePartitionFilterPredicateTest { + private static final String TEST_PARTITION_VALUE_1 = "value1"; + private IcebergMatchesAnyPropNamePartitionFilterPredicate predicate; + + @BeforeMethod + public void setup() { + predicate = new IcebergMatchesAnyPropNamePartitionFilterPredicate(0, TEST_PARTITION_VALUE_1); + } + + @Test + public void testPartitionValueNULL() { + // Just mocking, so that the partition value is NULL + Assert.assertFalse(predicate.test(Mockito.mock(StructLike.class))); + } + + @Test + public void testWhenPartitionIsNull() { + Assert.assertFalse(predicate.test(null)); + } + + @Test + public void testPartitionValueMatch() { + StructLike mockPartition = Mockito.mock(StructLike.class); + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(Object.class))).thenReturn("value1"); + Assert.assertTrue(predicate.test(mockPartition)); + } + + @Test + public void testPartitionValueDoesNotMatch() { + StructLike mockPartition = Mockito.mock(StructLike.class); + Mockito.when(mockPartition.get(Mockito.anyInt(), Mockito.eq(Object.class))).thenReturn(""); + Assert.assertFalse(predicate.test(mockPartition)); + } +} diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java new file mode 100644 index 0000000000..c6b556c8c3 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg.predicates; + +import java.util.List; + +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.transforms.Transform; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; + +/** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil} */ +public class IcebergPartitionFilterPredicateUtilTest { + private TableMetadata mockTableMetadata; + private final List supportedTransforms = ImmutableList.of("supported1", "supported2"); + + private void setupMockData(String name, String transform) { + mockTableMetadata = Mockito.mock(TableMetadata.class); + + PartitionSpec mockPartitionSpec = Mockito.mock(PartitionSpec.class); + PartitionField mockPartitionField = Mockito.mock(PartitionField.class); + Transform mockTransform = Mockito.mock(Transform.class); + + List partitionFields = ImmutableList.of(mockPartitionField); + + Mockito.when(mockTableMetadata.spec()).thenReturn(mockPartitionSpec); + Mockito.when(mockPartitionSpec.fields()).thenReturn(partitionFields); + Mockito.when(mockPartitionField.name()).thenReturn(name); + Mockito.when(mockPartitionField.transform()).thenReturn(mockTransform); + Mockito.when(mockTransform.toString()).thenReturn(transform); + } + + @Test + public void testPartitionTransformNotSupported() { + setupMockData("col1", "unsupported"); + IllegalArgumentException exception = Assert.expectThrows(IllegalArgumentException.class, () -> { + IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col1", mockTableMetadata, supportedTransforms); + }); + Assert.assertTrue(exception.getMessage().contains("Partition transform unsupported is not supported. Supported transforms are [supported1, supported2]")); + } + + @Test + public void testPartitionTransformSupported() { + setupMockData("col1", "supported1"); + int result = IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col1", mockTableMetadata, supportedTransforms); + Assert.assertEquals(result, 0); + } + + @Test + public void testPartitionColumnNotFound() { + setupMockData("col", "supported1"); + int result = IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col2", mockTableMetadata, supportedTransforms); + Assert.assertEquals(result, -1); + } + + @Test + public void testPartitionColumnFoundIndex1() { + mockTableMetadata = Mockito.mock(TableMetadata.class); + PartitionSpec mockPartitionSpec = Mockito.mock(PartitionSpec.class); + PartitionField mockPartitionField1 = Mockito.mock(PartitionField.class); + PartitionField mockPartitionField2 = Mockito.mock(PartitionField.class); + Transform mockTransform1 = Mockito.mock(Transform.class); + Transform mockTransform2 = Mockito.mock(Transform.class); + + List partitionFields = ImmutableList.of(mockPartitionField1, mockPartitionField2); + + Mockito.when(mockTableMetadata.spec()).thenReturn(mockPartitionSpec); + Mockito.when(mockPartitionSpec.fields()).thenReturn(partitionFields); + Mockito.when(mockPartitionField1.name()).thenReturn("col1"); + Mockito.when(mockPartitionField1.transform()).thenReturn(mockTransform1); + Mockito.when(mockTransform1.toString()).thenReturn("supported1"); + Mockito.when(mockPartitionField2.name()).thenReturn("col2"); + Mockito.when(mockPartitionField2.transform()).thenReturn(mockTransform2); + Mockito.when(mockTransform2.toString()).thenReturn("supported2"); + + int result = IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col2", mockTableMetadata, supportedTransforms); + Assert.assertEquals(result, 1); + } +} \ No newline at end of file