Skip to content

Commit

Permalink
[GOBBLIN-1853] Reduce # of Hive calls during schema related updates (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
homatthew authored Jul 18, 2023
1 parent ca48bcd commit c4a466b
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@

package org.apache.gobblin.hive.writer;

import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -38,20 +28,37 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;

import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ListenableFuture;

import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
import org.apache.gobblin.hive.HiveRegister;
import org.apache.gobblin.hive.HiveRegistrationUnit;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
Expand All @@ -68,10 +75,6 @@
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.ClustersNames;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;


/**
* This writer is used to register the hiveSpec into hive metaStore
Expand Down Expand Up @@ -313,9 +316,15 @@ protected static boolean updateLatestSchemaMapWithExistingSchema(String dbName,
return false;
}

HiveTable existingTable = hiveRegister.getTable(dbName, tableName).get();
latestSchemaMap.put(tableKey,
existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
HiveTable table = hiveRegister.getTable(dbName, tableName).get();
String latestSchema = table.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
if (latestSchema == null) {
throw new IllegalStateException(String.format("The %s in the table %s.%s is null. This implies the DB is "
+ "misconfigured and was not correctly created through Gobblin, since all Gobblin managed tables should "
+ "have %s", HiveAvroSerDeManager.SCHEMA_LITERAL, dbName, tableName, HiveAvroSerDeManager.SCHEMA_LITERAL));
}

latestSchemaMap.put(tableKey, latestSchema);
return true;
}

Expand Down Expand Up @@ -445,10 +454,14 @@ private void schemaUpdateHelper(GobblinMetadataChangeEvent gmce, HiveSpec spec,
return;
}
//Force to set the schema even there is no schema literal defined in the spec
if (latestSchemaMap.containsKey(tableKey)) {
spec.getTable().getSerDeProps()
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey));
HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
String latestSchema = latestSchemaMap.get(tableKey);
if (latestSchema != null) {
String tableSchema = spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
if (tableSchema == null || !tableSchema.equals(latestSchema)) {
spec.getTable().getSerDeProps()
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey));
HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.hive.writer.MetadataWriterKeys;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -112,6 +109,7 @@
import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.gobblin.hive.writer.MetadataWriterKeys;
import org.apache.gobblin.iceberg.Utils.IcebergUtils;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
Expand All @@ -122,13 +120,15 @@
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.time.TimeIterator;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.WriterUtils;

import static org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*;

/**
Expand Down Expand Up @@ -493,13 +493,20 @@ private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdenti
* @return table with updated schema and partition spec
*/
private Table addPartitionToIcebergTable(Table table, String fieldName, String type) {
boolean isTableUpdated = false;
if(!table.schema().columns().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) {
table.updateSchema().addColumn(fieldName, Types.fromPrimitiveString(type)).commit();
isTableUpdated = true;
}
if(!table.spec().fields().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) {
table.updateSpec().addField(fieldName).commit();
isTableUpdated = true;
}

if (isTableUpdated) {
table.refresh();
}
table.refresh();

return table;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,12 @@ public void testUpdateLatestSchemaWithExistingSchema() throws IOException {
));
Assert.assertTrue(updateLatestSchema.apply(tableNameAllowed));
Mockito.verify(hiveRegister, Mockito.times(2)).getTable(eq(dbName), eq(tableNameAllowed));

HiveTable tableThatHasNoSchemaLiteral = Mockito.mock(HiveTable.class);
String nameOfTableThatHasNoSchemaLiteral = "improperlyConfiguredTable";
Mockito.when(hiveRegister.getTable(eq(dbName), eq(nameOfTableThatHasNoSchemaLiteral))).thenReturn(Optional.of(tableThatHasNoSchemaLiteral));
Mockito.when(tableThatHasNoSchemaLiteral.getSerDeProps()).thenReturn(new State());
Assert.assertThrows(IllegalStateException.class, () -> updateLatestSchema.apply(nameOfTableThatHasNoSchemaLiteral));
}

private String writeRecord(File file) throws IOException {
Expand Down

0 comments on commit c4a466b

Please sign in to comment.