Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
* upstream/master:
  [GOBBLIN-1807] Replaces conjars.org with conjars.wensel.net (apache#3668)
  [GOBBLIN-1802]Register iceberg table metadata update with destination side catalog (apache#3663)
  Add matching of non-transient exceptions that will avoid failing the container in GMIP (apache#3662)
  • Loading branch information
phet committed Apr 11, 2023
2 parents e6a3bf1 + 0d69363 commit 4e4f95a
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 87 deletions.
16 changes: 15 additions & 1 deletion defaultEnvironment.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,22 @@ subprojects {
maven {
url "https://repository.cloudera.com/artifactory/cloudera-repos/"
}

// Conjars is a read only repository that hosts older artifacts, necessary for hive 1.0.1 and hadoop 2.10
maven {
url "http://conjars.org/repo"
url "https://conjars.wensel.net/repo/"
content {
// Required for:
// com.linkedin.hive:hive-exec:1.0.1-avro > org.apache.calcite:calcite-core:0.9.2-incubating > net.hydromatic:linq4j:0.4
// com.linkedin.hive:hive-exec:1.0.1-avro > org.apache.calcite:calcite-core:0.9.2-incubating > net.hydromatic:quidem:0.1.1
includeGroup "net.hydromatic"
// Required for:
// com.linkedin.hive:hive-exec:1.0.1-avro > org.apache.calcite:calcite-core:0.9.2-incubating > eigenbase:eigenbase-properties:1.1.4.
includeGroup "eigenbase"
// Required for:
// org.apache.hadoop:hadoop-common:2.10.0 > org.apache.hadoop:hadoop-auth:2.10.0 > com.nimbusds:nimbus-jose-jwt:4.41.1 > net.minidev:json-smart:[1.3.1,2.3]
includeGroup "net.minidev"
}
}
}

Expand Down
3 changes: 0 additions & 3 deletions gobblin-admin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ apply plugin: 'java'

repositories {
mavenCentral()
maven {
url "http://conjars.org/repo"
}
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.data.management.copy.iceberg;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.data.management.copy.iceberg;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;


Expand All @@ -28,4 +29,5 @@ public interface IcebergCatalog {
IcebergTable openTable(String dbName, String tableName);
String getCatalogUri();
void initialize(Map<String, String> properties, Configuration configuration);
boolean tableAlreadyExists(IcebergTable icebergTable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.OwnerAndPermission;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.dataset.DatasetDescriptor;
Expand All @@ -64,21 +65,21 @@
public class IcebergDataset implements PrioritizedCopyableDataset {
private final String dbName;
private final String inputTableName;
private final IcebergTable icebergTable;
private final IcebergTable srcIcebergTable;
/** Presumed destination {@link IcebergTable} exists */
private final IcebergTable destIcebergTable;
protected final Properties properties;
protected final FileSystem sourceFs;
private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired

/** Target metastore URI */
public static final String ICEBERG_TARGET_CATALOG_URI_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
/** Target database name */
public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
/** Destination database name */
public static final String DESTINATION_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.destination.database";

public IcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem sourceFs) {
public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) {
this.dbName = db;
this.inputTableName = table;
this.icebergTable = icebergTbl;
this.srcIcebergTable = srcIcebergTable;
this.destIcebergTable = destIcebergTable;
this.properties = properties;
this.sourceFs = sourceFs;
}
Expand Down Expand Up @@ -154,6 +155,7 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
fileEntity.setDestinationData(getDestinationDataset(targetFs));
copyEntities.add(fileEntity);
}
copyEntities.add(createPostPublishStep(this.srcIcebergTable, this.destIcebergTable));
log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size());
return copyEntities;
}
Expand All @@ -163,7 +165,7 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
* @return a map of path, file status for each file that needs to be copied
*/
protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
IcebergTable icebergTable = this.getIcebergTable();
IcebergTable icebergTable = this.getSrcIcebergTable();
/** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */
Function<String, Boolean> isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr ->
// omit considering timestamp (or other markers of freshness), as files should be immutable
Expand Down Expand Up @@ -307,10 +309,15 @@ protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
}

protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
return this.icebergTable.getDatasetDescriptor(sourceFs);
return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
}

protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
return this.icebergTable.getDatasetDescriptor(targetFs);
return this.destIcebergTable.getDatasetDescriptor(targetFs);
}

private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable, IcebergTable dstIcebergTable) {
IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(srcIcebergTable, dstIcebergTable);
return new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,45 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.util.HadoopUtils;


/**
* Finds {@link IcebergDataset}s. Will look for tables in a database using a {@link IcebergCatalog},
* and creates a {@link IcebergDataset} for each one.
*/
@Slf4j
@RequiredArgsConstructor
public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDataset> {

public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset";
public static final String ICEBERG_CLUSTER_KEY = "cluster";
public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name";
public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name";
public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class";
public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".source.cluster.name";
public static final String ICEBERG_DEST_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".destination.catalog.class";
public static final String ICEBERG_DEST_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".copy.destination.catalog.uri";
public static final String ICEBERG_DEST_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".destination.cluster.name";
public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name";
public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name";

public enum CatalogLocation {
SOURCE,
DESTINATION
}

protected final FileSystem sourceFs;
private final Properties properties;
Expand All @@ -74,18 +85,13 @@ public List<IcebergDataset> findDatasets() throws IOException {
String dbName = properties.getProperty(ICEBERG_DB_NAME);
String tblName = properties.getProperty(ICEBERG_TABLE_NAME);

try {
IcebergCatalog icebergCatalog = createIcebergCatalog(this.properties);
/* Each Iceberg dataset maps to an Iceberg table
* TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table
*/
matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, properties, sourceFs));
log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(),
matchingDatasets, dbName, tblName);
return matchingDatasets;
} catch (ReflectiveOperationException exception) {
throw new IOException(exception);
}
IcebergCatalog sourceIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE);
IcebergCatalog destinationIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION);
/* Each Iceberg dataset maps to an Iceberg table */
matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs));
log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(),
matchingDatasets, dbName, tblName); // until future support added to specify multiple icebergs, count expected always to be one
return matchingDatasets;
}

@Override
Expand All @@ -98,20 +104,44 @@ public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
return findDatasets().iterator();
}

protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
/**
* Requires both source and destination catalogs to connect to their respective {@link IcebergTable}
* Note: the destination side {@link IcebergTable} should be present before initiating replication
* @return {@link IcebergDataset} with its corresponding source and destination {@link IcebergTable}
*/
protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException {
IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName);
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName);
// TODO: Rethink strategy to enforce dest iceberg table
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, tblName));
return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs);
}

protected IcebergCatalog createIcebergCatalog(Properties properties) throws IOException, ClassNotFoundException {
protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
Map<String, String> catalogProperties = new HashMap<>();
String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is required");
catalogProperties.put(CatalogProperties.URI, catalogUri);
// introducing an optional property for catalogs requiring cluster specific properties
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
Configuration configuration = HadoopUtils.getConfFromProperties(properties);
String icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
String catalogUri;
String icebergCatalogClassName;
switch (location) {
case SOURCE:
catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
// introducing an optional property for catalogs requiring cluster specific properties
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
break;
case DESTINATION:
catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
// introducing an optional property for catalogs requiring cluster specific properties
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
icebergCatalogClassName = properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
break;
default:
throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location);
}
catalogProperties.put(CatalogProperties.URI, catalogUri);
return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.gobblin.data.management.copy.iceberg;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;

import lombok.extern.slf4j.Slf4j;


Expand All @@ -47,11 +49,16 @@ public void initialize(Map<String, String> properties, Configuration configurati

@Override
public String getCatalogUri() {
return hc.getConf().get(CatalogProperties.URI, "<<not set>>");
return hc.getConf().get(HiveConf.ConfVars.METASTOREURIS.varname, "<<not set>>");
}

@Override
protected TableOperations createTableOperations(TableIdentifier tableId) {
return hc.newTableOps(tableId);
}

@Override
public boolean tableAlreadyExists(IcebergTable icebergTable) {
return hc.tableExists(icebergTable.getTableId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;

import org.apache.iceberg.TableMetadata;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.commit.CommitStep;

/**
* {@link CommitStep} to perform Iceberg registration.
*/
@Slf4j
@AllArgsConstructor
public class IcebergRegisterStep implements CommitStep {

private final IcebergTable srcIcebergTable;
private final IcebergTable destIcebergTable;

@Override
public boolean isCompleted() throws IOException {
return false;
}

@Override
public void execute() throws IOException {
TableMetadata destinationMetadata = null;
try {
destinationMetadata = this.destIcebergTable.accessTableMetadata();
} catch (IcebergTable.TableNotFoundException tnfe) {
log.warn("Destination TableMetadata doesn't exist because: " , tnfe);
}
this.destIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(), destinationMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
Expand All @@ -44,6 +40,10 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;

Expand All @@ -67,7 +67,7 @@ public TableNotFoundException(TableIdentifier tableId) {
this.tableId = tableId;
}
}

@Getter
private final TableIdentifier tableId;
private final TableOperations tableOps;
private final String catalogUri;
Expand Down Expand Up @@ -194,4 +194,11 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
return descriptor;
}
/** Registers {@link IcebergTable} after publishing data.
* @param dstMetadata is null if destination {@link IcebergTable} is absent, in which case registration is skipped */
protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dstMetadata) {
if (dstMetadata != null) {
this.tableOps.commit(srcMetadata, dstMetadata);
}
}
}
Loading

0 comments on commit 4e4f95a

Please sign in to comment.