Skip to content

Commit

Permalink
Add dataset level timers and more logs in flush
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-moseley committed Jun 22, 2023
1 parent 5362b9e commit ae7c9b1
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.hive.writer;

import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
Expand Down Expand Up @@ -160,8 +161,9 @@ public void flush(String dbName, String tableName) throws IOException {
HashMap<List<String>, ListenableFuture<Void>> executionMap = this.currentExecutionMap.get(tableKey);
//iterator all execution to get the result to make sure they all succeeded
for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution : executionMap.entrySet()) {
try {
try (Timer.Context context = new Timer().time()) {
execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
log.info("Time taken to add partition to table {} is {} ms", tableKey, TimeUnit.NANOSECONDS.toMillis(context.stop()));
} catch (TimeoutException e) {
// Since TimeoutException should always be a transient issue, throw RuntimeException which will fail/retry container
throw new RuntimeException("Timeout waiting for result of registration for table " + tableKey, e);
Expand All @@ -177,7 +179,11 @@ public void flush(String dbName, String tableName) throws IOException {
if (cache != null) {
HiveSpec hiveSpec = cache.getIfPresent(execution.getKey());
if (hiveSpec != null) {
eventSubmitter.submit(buildCommitEvent(dbName, tableName, execution.getKey(), hiveSpec, HivePartitionOperation.ADD_OR_MODIFY));
try (Timer.Context context = new Timer().time()) {
eventSubmitter.submit(buildCommitEvent(dbName, tableName, execution.getKey(), hiveSpec,
HivePartitionOperation.ADD_OR_MODIFY));
log.info("Time taken to submit event for table {} is {} ms", tableKey, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,14 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
private final Set<String> currentErrorDatasets = new HashSet<>();
@Setter
private int maxErrorDataset;
private final MetricContext metricContext;
protected EventSubmitter eventSubmitter;
private final Set<String> transientExceptionMessages;
private final Set<String> nonTransientExceptionMessages;
private final Map<String, ContextAwareTimer> metadataWriterWriteTimers = new HashMap<>();
private final Map<String, ContextAwareTimer> metadataWriterFlushTimers = new HashMap<>();
private final ContextAwareTimer hiveSpecComputationTimer;
private final Map<String, ContextAwareTimer> datasetTimers = new HashMap<>();

@AllArgsConstructor
static class TableStatus {
Expand All @@ -159,7 +161,7 @@ public GobblinMCEWriter(DataWriterBuilder<Schema, GenericRecord> builder, State
List<Tag<?>> tags = Lists.newArrayList();
String clusterIdentifier = ClustersNames.getInstance().getClusterName();
tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
MetricContext metricContext = Instrumented.getMetricContext(state, this.getClass(), tags);
metricContext = Instrumented.getMetricContext(state, this.getClass(), tags);
eventSubmitter = new EventSubmitter.Builder(metricContext, GOBBLIN_MCE_WRITER_METRIC_NAMESPACE).build();
for (String className : state.getPropAsList(GMCE_METADATA_WRITER_CLASSES, IcebergMetadataWriter.class.getName())) {
metadataWriters.add(closer.register(GobblinConstructorUtils.invokeConstructor(MetadataWriter.class, className, state)));
Expand Down Expand Up @@ -306,6 +308,9 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) throws I
String tableName = spec.getTable().getTableName();
String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
partitionKeysMap.put(tableString, spec.getTable().getPartitionKeys());
if (!datasetTimers.containsKey(tableName)) {
datasetTimers.put(tableName, metricContext.contextAwareTimer(tableName, 1, TimeUnit.HOURS));
}
if (!tableOperationTypeMap.containsKey(tableString)) {
tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
gmce.getDatasetIdentifier().getNativeName(), watermark.getSource(),
Expand Down Expand Up @@ -352,8 +357,10 @@ void writeWithMetadataWriters(
writer.reset(dbName, tableName);
} else {
try {
Timer timer = metadataWriterWriteTimers.get(writer.getClass().getName());
try (Timer.Context context = timer.time()) {
Timer writeTimer = metadataWriterWriteTimers.get(writer.getClass().getName());
Timer datasetTimer = datasetTimers.get(tableName);
try (Timer.Context writeContext = writeTimer.time();
Timer.Context datasetContext = datasetTimer.time()) {
writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
}
} catch (Exception e) {
Expand Down Expand Up @@ -433,8 +440,10 @@ private void flush(String dbName, String tableName) throws IOException {
writer.reset(dbName, tableName);
} else {
try {
Timer timer = metadataWriterFlushTimers.get(writer.getClass().getName());
try (Timer.Context context = timer.time()) {
Timer flushTimer = metadataWriterFlushTimers.get(writer.getClass().getName());
Timer datasetTimer = datasetTimers.get(tableName);
try (Timer.Context flushContext = flushTimer.time();
Timer.Context datasetContext = datasetTimer.time()) {
writer.flush(dbName, tableName);
}
} catch (IOException e) {
Expand Down Expand Up @@ -585,9 +594,10 @@ private List<String> getFailedWriterList(MetadataWriter failedWriter) {
}

private void logTimers() {
logTimer(hiveSpecComputationTimer);
metadataWriterWriteTimers.values().forEach(this::logTimer);
metadataWriterFlushTimers.values().forEach(this::logTimer);
logTimer(hiveSpecComputationTimer);
datasetTimers.values().forEach(this::logTimer);
}

private void logTimer(ContextAwareTimer timer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,10 @@ public void flush(String dbName, String tableName) throws IOException {
String topicName = getTopicName(tid, tableMetadata);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
try (Timer.Context context = new Timer().time()) {
sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
log.info("Sending audit counts for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}
if (tableMetadata.completenessEnabled) {
checkAndUpdateCompletenessWatermark(tableMetadata, topicName, tableMetadata.datePartitions, props);
}
Expand Down Expand Up @@ -869,20 +872,25 @@ public void flush(String dbName, String tableName) throws IOException {
UpdateProperties updateProperties = transaction.updateProperties();
props.forEach(updateProperties::set);
updateProperties.commit();
try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName)) {
try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName);
Timer.Context context = new Timer().time()) {
transaction.commitTransaction();
log.info("Committing transaction for table {} took {} ms", tid, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}

// Emit GTE for snapshot commits
Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
Map<String, String> currentProps = tableMetadata.table.get().properties();
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
try (Timer.Context context = new Timer().time()) {
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
log.info("Sending snapshot commit event for {} took {} ms", topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
}

//Reset the table metadata for next accumulation period
tableMetadata.reset(currentProps, highWatermark);
log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid.toString()));
log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid));
} else {
log.info("There's no transaction initiated for the table {}", tid.toString());
log.info("There's no transaction initiated for the table {}", tid);
}
} catch (RuntimeException e) {
throw new IOException(String.format("Fail to flush table %s %s", dbName, tableName), e);
Expand Down

0 comments on commit ae7c9b1

Please sign in to comment.