Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1842] Add timers to GobblinMCEWriter #3703

Merged
merged 3 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.hive.writer;

import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
Expand Down Expand Up @@ -160,8 +161,9 @@ public void flush(String dbName, String tableName) throws IOException {
HashMap<List<String>, ListenableFuture<Void>> executionMap = this.currentExecutionMap.get(tableKey);
//iterator all execution to get the result to make sure they all succeeded
for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution : executionMap.entrySet()) {
try {
try (Timer.Context context = new Timer().time()) {
execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
log.info("Time taken to add partition to table {} is {} ms", tableKey, TimeUnit.NANOSECONDS.toMillis(context.stop()));
} catch (TimeoutException e) {
// Since TimeoutException should always be a transient issue, throw RuntimeException which will fail/retry container
throw new RuntimeException("Timeout waiting for result of registration for table " + tableKey, e);
Expand All @@ -177,7 +179,11 @@ public void flush(String dbName, String tableName) throws IOException {
if (cache != null) {
HiveSpec hiveSpec = cache.getIfPresent(execution.getKey());
if (hiveSpec != null) {
eventSubmitter.submit(buildCommitEvent(dbName, tableName, execution.getKey(), hiveSpec, HivePartitionOperation.ADD_OR_MODIFY));
try (Timer.Context context = new Timer().time()) {
eventSubmitter.submit(buildCommitEvent(dbName, tableName, execution.getKey(), hiveSpec,
HivePartitionOperation.ADD_OR_MODIFY));
log.info("Time taken to submit event for table {} is {} ms", tableKey, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.commons.collections.CollectionUtils;

import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;

import com.google.common.base.Joiner;
Expand Down Expand Up @@ -69,6 +71,7 @@
import org.apache.gobblin.metadata.DataFile;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metrics.ContextAwareTimer;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
Expand Down Expand Up @@ -128,9 +131,14 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
private final Set<String> currentErrorDatasets = new HashSet<>();
@Setter
private int maxErrorDataset;
private final MetricContext metricContext;
protected EventSubmitter eventSubmitter;
private final Set<String> transientExceptionMessages;
private final Set<String> nonTransientExceptionMessages;
private final Map<String, ContextAwareTimer> metadataWriterWriteTimers = new HashMap<>();
private final Map<String, ContextAwareTimer> metadataWriterFlushTimers = new HashMap<>();
private final ContextAwareTimer hiveSpecComputationTimer;
private final Map<String, ContextAwareTimer> datasetTimers = new HashMap<>();

@AllArgsConstructor
static class TableStatus {
Expand All @@ -150,19 +158,22 @@ public GobblinMCEWriter(DataWriterBuilder<Schema, GenericRecord> builder, State
acceptedClusters = properties.getPropAsSet(ACCEPTED_CLUSTER_NAMES, ClustersNames.getInstance().getClusterName());
state = properties;
maxErrorDataset = state.getPropAsInt(GMCE_METADATA_WRITER_MAX_ERROR_DATASET, DEFUALT_GMCE_METADATA_WRITER_MAX_ERROR_DATASET);
List<Tag<?>> tags = Lists.newArrayList();
String clusterIdentifier = ClustersNames.getInstance().getClusterName();
tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
metricContext = Instrumented.getMetricContext(state, this.getClass(), tags);
eventSubmitter = new EventSubmitter.Builder(metricContext, GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
for (String className : state.getPropAsList(GMCE_METADATA_WRITER_CLASSES, IcebergMetadataWriter.class.getName())) {
metadataWriters.add(closer.register(GobblinConstructorUtils.invokeConstructor(MetadataWriter.class, className, state)));
metadataWriterWriteTimers.put(className, metricContext.contextAwareTimer(className + ".write", 1, TimeUnit.HOURS));
metadataWriterFlushTimers.put(className, metricContext.contextAwareTimer(className + ".flush", 1, TimeUnit.HOURS));
}
hiveSpecComputationTimer = metricContext.contextAwareTimer("hiveSpec.computation", 1, TimeUnit.HOURS);
tableOperationTypeMap = new HashMap<>();
parallelRunner = closer.register(new ParallelRunner(state.getPropAsInt(METADATA_REGISTRATION_THREADS, 20),
FileSystem.get(HadoopUtils.getConfFromState(properties))));
parallelRunnerTimeoutMills =
state.getPropAsInt(METADATA_PARALLEL_RUNNER_TIMEOUT_MILLS, DEFAULT_ICEBERG_PARALLEL_TIMEOUT_MILLS);
List<Tag<?>> tags = Lists.newArrayList();
String clusterIdentifier = ClustersNames.getInstance().getClusterName();
tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass(), tags);
eventSubmitter = new EventSubmitter.Builder(metricContext, GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
transientExceptionMessages = new HashSet<>(properties.getPropAsList(TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
nonTransientExceptionMessages = new HashSet<>(properties.getPropAsList(NON_TRANSIENT_EXCEPTION_MESSAGES_KEY, ""));
}
Expand All @@ -187,26 +198,28 @@ public void write(GenericRecord record) throws IOException {
*/
private void computeSpecMap(List<String> files, ConcurrentHashMap<String, Collection<HiveSpec>> specsMap,
Cache<String, Collection<HiveSpec>> cache, State registerState, boolean isPrefix) throws IOException {
HiveRegistrationPolicy policy = HiveRegistrationPolicyBase.getPolicy(registerState);
for (String file : files) {
parallelRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
Path regPath = isPrefix ? new Path(file) : new Path(file).getParent();
//Use raw path to comply with HDFS federation setting.
Path rawPath = new Path(regPath.toUri().getRawPath());
specsMap.put(regPath.toString(), cache.get(regPath.toString(), () -> policy.getHiveSpecs(rawPath)));
} catch (Throwable e) {
//todo: Emit failed GMCE in the future to easily track the error gmce and investigate the reason for that.
log.warn("Cannot get Hive Spec for {} using policy {} due to:", file, policy.toString());
log.warn(e.getMessage());
try (Timer.Context context = hiveSpecComputationTimer.time()) {
HiveRegistrationPolicy policy = HiveRegistrationPolicyBase.getPolicy(registerState);
for (String file : files) {
parallelRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
Path regPath = isPrefix ? new Path(file) : new Path(file).getParent();
//Use raw path to comply with HDFS federation setting.
Path rawPath = new Path(regPath.toUri().getRawPath());
specsMap.put(regPath.toString(), cache.get(regPath.toString(), () -> policy.getHiveSpecs(rawPath)));
} catch (Throwable e) {
//todo: Emit failed GMCE in the future to easily track the error gmce and investigate the reason for that.
log.warn("Cannot get Hive Spec for {} using policy {} due to:", file, policy.toString());
log.warn(e.getMessage());
}
return null;
}
return null;
}
}, file);
}, file);
}
parallelRunner.waitForTasks(parallelRunnerTimeoutMills);
}
parallelRunner.waitForTasks(parallelRunnerTimeoutMills);
}

@Override
Expand Down Expand Up @@ -295,6 +308,9 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) throws I
String tableName = spec.getTable().getTableName();
String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
partitionKeysMap.put(tableString, spec.getTable().getPartitionKeys());
if (!datasetTimers.containsKey(tableName)) {
datasetTimers.put(tableName, metricContext.contextAwareTimer(tableName, 1, TimeUnit.HOURS));
}
if (!tableOperationTypeMap.containsKey(tableString)) {
tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
gmce.getDatasetIdentifier().getNativeName(), watermark.getSource(),
Expand Down Expand Up @@ -341,7 +357,12 @@ void writeWithMetadataWriters(
writer.reset(dbName, tableName);
} else {
try {
writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
Timer writeTimer = metadataWriterWriteTimers.get(writer.getClass().getName());
Timer datasetTimer = datasetTimers.get(tableName);
try (Timer.Context writeContext = writeTimer.time();
Timer.Context datasetContext = datasetTimer.time()) {
writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
}
} catch (Exception e) {
if (exceptionMatches(e, transientExceptionMessages)) {
throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e);
Expand Down Expand Up @@ -419,7 +440,12 @@ private void flush(String dbName, String tableName) throws IOException {
writer.reset(dbName, tableName);
} else {
try {
writer.flush(dbName, tableName);
Timer flushTimer = metadataWriterFlushTimers.get(writer.getClass().getName());
Timer datasetTimer = datasetTimers.get(tableName);
try (Timer.Context flushContext = flushTimer.time();
Timer.Context datasetContext = datasetTimer.time()) {
writer.flush(dbName, tableName);
}
} catch (IOException e) {
if (exceptionMatches(e, transientExceptionMessages)) {
throw new RuntimeException("Failing container due to transient exception for db: " + dbName + " table: " + tableName, e);
Expand Down Expand Up @@ -480,6 +506,7 @@ public void flush() throws IOException {
}
entry.getValue().clear();
}
logTimers();
}

@Override
Expand Down Expand Up @@ -565,4 +592,16 @@ private List<String> getFailedWriterList(MetadataWriter failedWriter) {
List<MetadataWriter> failedWriters = metadataWriters.subList(metadataWriters.indexOf(failedWriter), metadataWriters.size());
return failedWriters.stream().map(writer -> writer.getClass().getName()).collect(Collectors.toList());
}

private void logTimers() {
logTimer(hiveSpecComputationTimer);
metadataWriterWriteTimers.values().forEach(this::logTimer);
metadataWriterFlushTimers.values().forEach(this::logTimer);
datasetTimers.values().forEach(this::logTimer);
}

private void logTimer(ContextAwareTimer timer) {
log.info("Timer {} 1 hour mean duration: {} ms", timer.getName(), TimeUnit.NANOSECONDS.toMillis((long) timer.getSnapshot().getMean()));
log.info("Timer {} 1 hour 99th percentile duration: {} ms", timer.getName(), TimeUnit.NANOSECONDS.toMillis((long) timer.getSnapshot().get99thPercentile()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,10 @@ public void flush(String dbName, String tableName) throws IOException {
String topicName = getTopicName(tid, tableMetadata);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
try (Timer.Context context = new Timer().time()) {
sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
log.info("Sending audit counts for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}
if (tableMetadata.completenessEnabled) {
checkAndUpdateCompletenessWatermark(tableMetadata, topicName, tableMetadata.datePartitions, props);
}
Expand Down Expand Up @@ -869,20 +872,25 @@ public void flush(String dbName, String tableName) throws IOException {
UpdateProperties updateProperties = transaction.updateProperties();
props.forEach(updateProperties::set);
updateProperties.commit();
try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName)) {
try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName);
Timer.Context context = new Timer().time()) {
transaction.commitTransaction();
log.info("Committing transaction for table {} took {} ms", tid, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}

// Emit GTE for snapshot commits
Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
Map<String, String> currentProps = tableMetadata.table.get().properties();
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
try (Timer.Context context = new Timer().time()) {
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
log.info("Sending snapshot commit event for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}

//Reset the table metadata for next accumulation period
tableMetadata.reset(currentProps, highWatermark);
log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid.toString()));
log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid));
} else {
log.info("There's no transaction initiated for the table {}", tid.toString());
log.info("There's no transaction initiated for the table {}", tid);
}
} catch (RuntimeException e) {
throw new IOException(String.format("Fail to flush table %s %s", dbName, tableName), e);
Expand Down