diff --git a/defaultEnvironment.gradle b/defaultEnvironment.gradle index af64d4ea0f..da93b9014e 100644 --- a/defaultEnvironment.gradle +++ b/defaultEnvironment.gradle @@ -25,8 +25,22 @@ subprojects { maven { url "https://repository.cloudera.com/artifactory/cloudera-repos/" } + + // Conjars is a read only repository that hosts older artifacts, necessary for hive 1.0.1 and hadoop 2.10 maven { - url "http://conjars.org/repo" + url "https://conjars.wensel.net/repo/" + content { + // Required for: + // com.linkedin.hive:hive-exec:1.0.1-avro > org.apache.calcite:calcite-core:0.9.2-incubating > net.hydromatic:linq4j:0.4 + // com.linkedin.hive:hive-exec:1.0.1-avro > org.apache.calcite:calcite-core:0.9.2-incubating > net.hydromatic:quidem:0.1.1 + includeGroup "net.hydromatic" + // Required for: + // com.linkedin.hive:hive-exec:1.0.1-avro > org.apache.calcite:calcite-core:0.9.2-incubating > eigenbase:eigenbase-properties:1.1.4. + includeGroup "eigenbase" + // Required for: + // org.apache.hadoop:hadoop-common:2.10.0 > org.apache.hadoop:hadoop-auth:2.10.0 > com.nimbusds:nimbus-jose-jwt:4.41.1 > net.minidev:json-smart:[1.3.1,2.3] + includeGroup "net.minidev" + } } } diff --git a/gobblin-admin/build.gradle b/gobblin-admin/build.gradle index 7b5ee28f1a..cef3493b62 100644 --- a/gobblin-admin/build.gradle +++ b/gobblin-admin/build.gradle @@ -19,9 +19,6 @@ apply plugin: 'java' repositories { mavenCentral() - maven { - url "http://conjars.org/repo" - } } dependencies { diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java index f3ef3309a2..0ac4dcc0b8 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java @@ -18,6 +18,7 @@ package org.apache.gobblin.data.management.copy.iceberg; import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.TableOperations; diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java index 15211c8b54..ac342e2e3a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java @@ -18,6 +18,7 @@ package org.apache.gobblin.data.management.copy.iceberg; import java.util.Map; + import org.apache.hadoop.conf.Configuration; @@ -28,4 +29,5 @@ public interface IcebergCatalog { IcebergTable openTable(String dbName, String tableName); String getCatalogUri(); void initialize(Map properties, Configuration configuration); + boolean tableAlreadyExists(IcebergTable icebergTable); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java index 2119daef5e..f4db7d4ff7 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java @@ -48,6 +48,7 @@ import org.apache.gobblin.data.management.copy.CopyableDataset; import org.apache.gobblin.data.management.copy.CopyableFile; import org.apache.gobblin.data.management.copy.OwnerAndPermission; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset; import org.apache.gobblin.data.management.partition.FileSet; import org.apache.gobblin.dataset.DatasetDescriptor; @@ -64,21 +65,21 @@ public class IcebergDataset implements PrioritizedCopyableDataset { private final String dbName; private final String inputTableName; - private final IcebergTable icebergTable; + private final IcebergTable srcIcebergTable; + /** Presumed destination {@link IcebergTable} exists */ + private final IcebergTable destIcebergTable; protected final Properties properties; protected final FileSystem sourceFs; private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired - /** Target metastore URI */ - public static final String ICEBERG_TARGET_CATALOG_URI_KEY = - IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri"; - /** Target database name */ - public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database"; + /** Destination database name */ + public static final String DESTINATION_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.destination.database"; - public IcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem sourceFs) { + public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) { this.dbName = db; this.inputTableName = table; - this.icebergTable = icebergTbl; + this.srcIcebergTable = srcIcebergTable; + this.destIcebergTable = destIcebergTable; this.properties = properties; this.sourceFs = sourceFs; } @@ -154,6 +155,7 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati fileEntity.setDestinationData(getDestinationDataset(targetFs)); copyEntities.add(fileEntity); } + copyEntities.add(createPostPublishStep(this.srcIcebergTable, this.destIcebergTable)); log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size()); return copyEntities; } @@ -163,7 +165,7 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati * @return a map of path, file status for each file that needs to be copied */ protected Map getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { - IcebergTable icebergTable = this.getIcebergTable(); + IcebergTable icebergTable = this.getSrcIcebergTable(); /** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */ Function isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr -> // omit considering timestamp (or other markers of freshness), as files should be immutable @@ -307,10 +309,15 @@ protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co } protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) { - return this.icebergTable.getDatasetDescriptor(sourceFs); + return this.srcIcebergTable.getDatasetDescriptor(sourceFs); } protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) { - return this.icebergTable.getDatasetDescriptor(targetFs); + return this.destIcebergTable.getDatasetDescriptor(targetFs); + } + + private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable, IcebergTable dstIcebergTable) { + IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(srcIcebergTable, dstIcebergTable); + return new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0); } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index bc111dc256..b20a1bc292 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -25,18 +25,22 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.IterableDatasetFinder; import org.apache.gobblin.util.HadoopUtils; + /** * Finds {@link IcebergDataset}s. Will look for tables in a database using a {@link IcebergCatalog}, * and creates a {@link IcebergDataset} for each one. @@ -44,15 +48,22 @@ @Slf4j @RequiredArgsConstructor public class IcebergDatasetFinder implements IterableDatasetFinder { - public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset"; public static final String ICEBERG_CLUSTER_KEY = "cluster"; - public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name"; - public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name"; public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class"; - public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri"; public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog"; + public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri"; public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".source.cluster.name"; + public static final String ICEBERG_DEST_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".destination.catalog.class"; + public static final String ICEBERG_DEST_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".copy.destination.catalog.uri"; + public static final String ICEBERG_DEST_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".destination.cluster.name"; + public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name"; + public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name"; + + public enum CatalogLocation { + SOURCE, + DESTINATION + } protected final FileSystem sourceFs; private final Properties properties; @@ -74,18 +85,13 @@ public List findDatasets() throws IOException { String dbName = properties.getProperty(ICEBERG_DB_NAME); String tblName = properties.getProperty(ICEBERG_TABLE_NAME); - try { - IcebergCatalog icebergCatalog = createIcebergCatalog(this.properties); - /* Each Iceberg dataset maps to an Iceberg table - * TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table - */ - matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, properties, sourceFs)); - log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), - matchingDatasets, dbName, tblName); - return matchingDatasets; - } catch (ReflectiveOperationException exception) { - throw new IOException(exception); - } + IcebergCatalog sourceIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE); + IcebergCatalog destinationIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION); + /* Each Iceberg dataset maps to an Iceberg table */ + matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs)); + log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), + matchingDatasets, dbName, tblName); // until future support added to specify multiple icebergs, count expected always to be one + return matchingDatasets; } @Override @@ -98,20 +104,44 @@ public Iterator getDatasetsIterator() throws IOException { return findDatasets().iterator(); } - protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) { - IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName); - return new IcebergDataset(dbName, tblName, icebergTable, properties, fs); + /** + * Requires both source and destination catalogs to connect to their respective {@link IcebergTable} + * Note: the destination side {@link IcebergTable} should be present before initiating replication + * @return {@link IcebergDataset} with its corresponding source and destination {@link IcebergTable} + */ + protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException { + IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName); + Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName)); + IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName); + // TODO: Rethink strategy to enforce dest iceberg table + Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, tblName)); + return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs); } - protected IcebergCatalog createIcebergCatalog(Properties properties) throws IOException, ClassNotFoundException { + protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException { Map catalogProperties = new HashMap<>(); - String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY); - Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is required"); - catalogProperties.put(CatalogProperties.URI, catalogUri); - // introducing an optional property for catalogs requiring cluster specific properties - Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); Configuration configuration = HadoopUtils.getConfFromProperties(properties); - String icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); + String catalogUri; + String icebergCatalogClassName; + switch (location) { + case SOURCE: + catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY); + Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY); + // introducing an optional property for catalogs requiring cluster specific properties + Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); + icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); + break; + case DESTINATION: + catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY); + Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY); + // introducing an optional property for catalogs requiring cluster specific properties + Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); + icebergCatalogClassName = properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); + break; + default: + throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location); + } + catalogProperties.put(CatalogProperties.URI, catalogUri); return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration); } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java index 5525750e64..af541a79a5 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java @@ -18,11 +18,13 @@ package org.apache.gobblin.data.management.copy.iceberg; import java.util.Map; + import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveCatalog; + import lombok.extern.slf4j.Slf4j; @@ -47,11 +49,16 @@ public void initialize(Map properties, Configuration configurati @Override public String getCatalogUri() { - return hc.getConf().get(CatalogProperties.URI, "<>"); + return hc.getConf().get(HiveConf.ConfVars.METASTOREURIS.varname, "<>"); } @Override protected TableOperations createTableOperations(TableIdentifier tableId) { return hc.newTableOps(tableId); } + + @Override + public boolean tableAlreadyExists(IcebergTable icebergTable) { + return hc.tableExists(icebergTable.getTableId()); + } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java new file mode 100644 index 0000000000..75f26787b0 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; + +import org.apache.iceberg.TableMetadata; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.commit.CommitStep; + +/** + * {@link CommitStep} to perform Iceberg registration. + */ +@Slf4j +@AllArgsConstructor +public class IcebergRegisterStep implements CommitStep { + + private final IcebergTable srcIcebergTable; + private final IcebergTable destIcebergTable; + + @Override + public boolean isCompleted() throws IOException { + return false; + } + + @Override + public void execute() throws IOException { + TableMetadata destinationMetadata = null; + try { + destinationMetadata = this.destIcebergTable.accessTableMetadata(); + } catch (IcebergTable.TableNotFoundException tnfe) { + log.warn("Destination TableMetadata doesn't exist because: " , tnfe); + } + this.destIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(), destinationMetadata); + } +} diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index f65a81417f..e8d0ee0ac2 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -26,10 +26,6 @@ import java.util.Set; import java.util.stream.Collectors; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - import org.apache.hadoop.fs.FileSystem; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; @@ -44,6 +40,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.DatasetDescriptor; @@ -67,7 +67,7 @@ public TableNotFoundException(TableIdentifier tableId) { this.tableId = tableId; } } - + @Getter private final TableIdentifier tableId; private final TableOperations tableOps; private final String catalogUri; @@ -194,4 +194,11 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) { descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString()); return descriptor; } + /** Registers {@link IcebergTable} after publishing data. + * @param dstMetadata is null if destination {@link IcebergTable} is absent, in which case registration is skipped */ + protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dstMetadata) { + if (dstMetadata != null) { + this.tableOps.commit(srcMetadata, dstMetadata); + } + } } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java index 9478207f8d..c1872cb4ad 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java @@ -188,12 +188,11 @@ public void testGetFilePathsDoesNotSwallowDestFileSystemException() throws IOExc MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI); FileSystem sourceFs = sourceFsBuilder.build(); - IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, null, new Properties(), sourceFs); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destFsBuilder.build(); Mockito.doThrow(new IOException("Ha - not so fast!")).when(destFs).getFileStatus(new Path(SNAPSHOT_PATHS_0.manifestListPath)); - CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs); icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration); } @@ -227,9 +226,10 @@ public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { sourceBuilder.addPaths(expectedPaths); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); IcebergDataset icebergDataset = - new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -252,9 +252,10 @@ public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOExcept sourceBuilder.addPaths(expectedPaths); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); IcebergDataset icebergDataset = - new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -282,8 +283,9 @@ public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws IOExc sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); + IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -293,7 +295,6 @@ public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws IOExc // preserving attributes for owner, group and permissions respectively .preserve(PreserveAttributes.fromMnemonicString("ugp")) .copyContext(new CopyContext()).build(); - Collection copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyFsOwnershipAndPermissionPreservation(copyEntities, sourceBuilder.getPathsAndFileStatuses()); } @@ -310,8 +311,9 @@ public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty() throw sourceBuilder.addPaths(expectedPaths); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); + IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -321,7 +323,6 @@ public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty() throw // without preserving attributes for owner, group and permissions .preserve(PreserveAttributes.fromMnemonicString("")) .copyContext(new CopyContext()).build(); - Collection copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyFsOwnershipAndPermissionPreservation(copyEntities, expectedPathsAndFileStatuses); } @@ -348,7 +349,7 @@ protected IcebergTable validateGetFilePathsGivenDestState(List copyEntities, List List actual = new ArrayList<>(); for (CopyEntity copyEntity : copyEntities) { String json = copyEntity.toString(); - String filepath = CopyEntityDeserializer.getFilePathAsStringFromJson(json); - actual.add(filepath); + if (isCopyableFile(json)) { + String filepath = CopyEntityDeserializer.getFilePathAsStringFromJson(json); + actual.add(filepath); + } else{ + verifyPostPublishStep(json); + } } Assert.assertEquals(actual.size(), expected.size(), "Set" + actual.toString() + " vs Set" + expected.toString()); Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray()); } + private static boolean isCopyableFile(String json) { + String objectType = new Gson().fromJson(json, JsonObject.class) + .getAsJsonPrimitive("object-type") + .getAsString(); + return objectType.equals("org.apache.gobblin.data.management.copy.CopyableFile"); + } + private static void verifyFsOwnershipAndPermissionPreservation(Collection copyEntities, Map expectedPathsAndFileStatuses) { for (CopyEntity copyEntity : copyEntities) { String copyEntityJson = copyEntity.toString(); - List ancestorFileOwnerAndPermissionsList = CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson); - CopyEntityDeserializer.FileOwnerAndPermissions destinationFileOwnerAndPermissions = CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson); - Path filePath = new Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson)); - FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath); - verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus); - // providing path's parent to verify ancestor owner and permissions - verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList, filePath.getParent(), expectedPathsAndFileStatuses); + if (isCopyableFile(copyEntityJson)) { + List ancestorFileOwnerAndPermissionsList = + CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson); + CopyEntityDeserializer.FileOwnerAndPermissions destinationFileOwnerAndPermissions = CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson); + Path filePath = new Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson)); + FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath); + verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus); + // providing path's parent to verify ancestor owner and permissions + verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList, filePath.getParent(), + expectedPathsAndFileStatuses); + } else { + verifyPostPublishStep(copyEntityJson); + } } } @@ -419,14 +437,21 @@ private static void verifyAncestorPermissions(List { public static final String GMCE_METADATA_WRITER_CLASSES = "gmce.metadata.writer.classes"; public static final String GMCE_METADATA_WRITER_MAX_ERROR_DATASET = "gmce.metadata.writer.max.error.dataset"; public static final String TRANSIENT_EXCEPTION_MESSAGES_KEY = "gmce.metadata.writer.transient.exception.messages"; + public static final String NON_TRANSIENT_EXCEPTION_MESSAGES_KEY = "gmce.metadata.writer.nonTransient.exception.messages"; public static final int DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET = 0; public static final int DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS = 60000; public static final String TABLE_NAME_DELIMITER = "."; @@ -124,10 +125,12 @@ public class GobblinMCEWriter implements DataWriter { private Map> partitionKeysMap; private Closer closer = Closer.create(); protected final AtomicLong recordCount = new AtomicLong(0L); + private final Set currentErrorDatasets = new HashSet<>(); @Setter private int maxErrorDataset; protected EventSubmitter eventSubmitter; private final Set transientExceptionMessages; + private final Set nonTransientExceptionMessages; @AllArgsConstructor static class TableStatus { @@ -161,6 +164,7 @@ public GobblinMCEWriter(DataWriterBuilder builder, State MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass(), tags); eventSubmitter = new EventSubmitter.Builder(metricContext, GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build(); transientExceptionMessages = new HashSet<>(properties.getPropAsList(TRANSIENT_EXCEPTION_MESSAGES_KEY, "")); + nonTransientExceptionMessages = new HashSet<>(properties.getPropAsList(NON_TRANSIENT_EXCEPTION_MESSAGES_KEY, "")); } @Override @@ -339,7 +343,7 @@ void writeWithMetadataWriters( try { writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec); } catch (Exception e) { - if (isExceptionTransient(e, transientExceptionMessages)) { + if (exceptionMatches(e, transientExceptionMessages)) { throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e); } meetException = true; @@ -390,8 +394,13 @@ private void addOrThrowException(Exception e, String tableString, String dbName, lastException.droppedPartitionValues.addAll(((HiveMetadataWriterWithPartitionInfoException) e).droppedPartitionValues); } this.datasetErrorMap.put(tableStatus.datasetPath, tableErrorMap); - log.error(String.format("Meet exception when flush table %s", tableString), e); - if (datasetErrorMap.size() > maxErrorDataset) { + if (!exceptionMatches(e, this.nonTransientExceptionMessages)) { + currentErrorDatasets.add(tableStatus.datasetPath); + log.error(String.format("Meet exception when flush table %s", tableString), e); + } else { + log.error(String.format("Detected known non-transient failure for table %s", tableString), e); + } + if (currentErrorDatasets.size() > maxErrorDataset) { //Fail the job if the error size exceeds some number throw new IOException(String.format("Container fails to flush for more than %s dataset, last exception we met is: ", maxErrorDataset), e); } @@ -412,7 +421,7 @@ private void flush(String dbName, String tableName) throws IOException { try { writer.flush(dbName, tableName); } catch (IOException e) { - if (isExceptionTransient(e, transientExceptionMessages)) { + if (exceptionMatches(e, transientExceptionMessages)) { throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e); } meetException = true; @@ -435,11 +444,12 @@ private void flush(String dbName, String tableName) throws IOException { } /** - * Check if exception is contained within a known list of transient exceptions. These exceptions should not be caught - * to avoid advancing watermarks and skipping GMCEs unnecessarily. + * Check if exception is contained within a known list of known exceptions. Transient exceptions should not be caught + * to avoid advancing watermarks and skipping GMCEs unnecessarily, while non-transient exceptions should not count + * towards the maximum number of failed datasets. */ - public static boolean isExceptionTransient(Exception e, Set transientExceptionMessages) { - return transientExceptionMessages.stream().anyMatch(message -> Throwables.getRootCause(e).toString().contains(message)); + public static boolean exceptionMatches(Exception e, Set exceptionMessages) { + return exceptionMessages.stream().anyMatch(message -> Throwables.getRootCause(e).toString().contains(message)); } /** diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java index 129817b5b9..415a1e5c81 100644 --- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java +++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriterTest.java @@ -213,12 +213,12 @@ public void testDetectTransientException() { Set transientExceptions = Sets.newHashSet("Filesystem closed", "Hive timeout", "RejectedExecutionException"); IOException transientException = new IOException("test1 Filesystem closed test"); IOException wrapperException = new IOException("wrapper exception", transientException); - Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(transientException, transientExceptions)); - Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(wrapperException, transientExceptions)); + Assert.assertTrue(GobblinMCEWriter.exceptionMatches(transientException, transientExceptions)); + Assert.assertTrue(GobblinMCEWriter.exceptionMatches(wrapperException, transientExceptions)); IOException nonTransientException = new IOException("Write failed due to bad schema"); - Assert.assertFalse(GobblinMCEWriter.isExceptionTransient(nonTransientException, transientExceptions)); + Assert.assertFalse(GobblinMCEWriter.exceptionMatches(nonTransientException, transientExceptions)); RejectedExecutionException rejectedExecutionException = new RejectedExecutionException(""); - Assert.assertTrue(GobblinMCEWriter.isExceptionTransient(rejectedExecutionException, transientExceptions)); + Assert.assertTrue(GobblinMCEWriter.exceptionMatches(rejectedExecutionException, transientExceptions)); } @DataProvider(name="AllowMockMetadataWriter") diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle index 3e3db2dfd4..56511ff051 100644 --- a/gradle/scripts/dependencyDefinitions.gradle +++ b/gradle/scripts/dependencyDefinitions.gradle @@ -123,7 +123,7 @@ ext.externalDependency = [ "guiceMultibindings": "com.google.inject.extensions:guice-multibindings:4.0", "guiceServlet": "com.google.inject.extensions:guice-servlet:4.0", "derby": "org.apache.derby:derby:10.12.1.1", - "mockito": "org.mockito:mockito-core:4.11.0", + "mockito": "org.mockito:mockito-inline:4.11.0", // upgraded to allow mocking for constructors, static and final methods; specifically for iceberg distcp "salesforceWsc": "com.force.api:force-wsc:" + salesforceVersion, "salesforcePartner": "com.force.api:force-partner-api:" + salesforceVersion, "scala": "org.scala-lang:scala-library:2.11.8",