Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BFD-3700: Pipeline submits its own per-RIF and per-manifest metrics to CloudWatch #2514

Merged
merged 9 commits into from
Jan 14, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import gov.cms.bfd.model.rda.MessageError;
import gov.cms.bfd.model.rif.RifFileType;
import gov.cms.bfd.model.rif.RifRecordEvent;
import gov.cms.bfd.pipeline.ccw.rif.CcwRifLoadJob;
import gov.cms.bfd.pipeline.ccw.rif.CcwRifLoadOptions;
import gov.cms.bfd.pipeline.ccw.rif.extract.ExtractionOptions;
import gov.cms.bfd.pipeline.ccw.rif.extract.s3.DataSetManifest;
Expand Down Expand Up @@ -373,7 +374,13 @@ public final class AppConfiguration extends BaseAppConfiguration {
* auto-generated aggregate metric names with suffixes like {@code .avg}.
*/
public static final Set<String> MICROMETER_CW_ALLOWED_METRIC_NAMES =
Set.of("FissClaimRdaSink.change.latency.millis", "McsClaimRdaSink.change.latency.millis");
Set.of(
"FissClaimRdaSink.change.latency.millis",
"McsClaimRdaSink.change.latency.millis",
CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_ACTIVE_TIMER_NAME,
CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_TOTAL_TIMER_NAME,
DefaultDataSetMonitorListener.Metrics.RIF_FILE_PROCESSING_ACTIVE_TIMER_NAME,
DefaultDataSetMonitorListener.Metrics.RIF_FILE_PROCESSING_TOTAL_TIMER_NAME);

/**
* The CCW rif load options. This can be null if the CCW job is not configured, Optional is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@
import gov.cms.bfd.pipeline.ccw.rif.CcwRifLoadJob;
import gov.cms.bfd.pipeline.ccw.rif.extract.RifFileRecords;
import gov.cms.bfd.pipeline.ccw.rif.extract.RifFilesProcessor;
import gov.cms.bfd.pipeline.ccw.rif.extract.s3.DataSetManifest;
import gov.cms.bfd.pipeline.ccw.rif.extract.s3.DataSetMonitorListener;
import gov.cms.bfd.pipeline.ccw.rif.extract.s3.S3RifFile;
import gov.cms.bfd.pipeline.ccw.rif.load.RifLoader;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,6 +37,9 @@ public final class DefaultDataSetMonitorListener implements DataSetMonitorListen
/** Metrics for this class. */
private final MetricRegistry appMetrics;

/** Micrometer metrics for this class. */
private final Metrics metrics;

/** Handles processing of new RIF files. */
private final RifFilesProcessor rifProcessor;

Expand All @@ -40,14 +50,19 @@ public final class DefaultDataSetMonitorListener implements DataSetMonitorListen
* Initializes the instance.
*
* @param appMetrics the {@link MetricRegistry} for the application
* @param micrometerMetrics the {@link MeterRegistry} for the application
* @param rifProcessor the {@link RifFilesProcessor} for the application
* @param rifLoader the {@link RifLoader} for the application
*/
DefaultDataSetMonitorListener(
MetricRegistry appMetrics, RifFilesProcessor rifProcessor, RifLoader rifLoader) {
MetricRegistry appMetrics,
MeterRegistry micrometerMetrics,
RifFilesProcessor rifProcessor,
RifLoader rifLoader) {
this.appMetrics = appMetrics;
this.rifProcessor = rifProcessor;
this.rifLoader = rifLoader;
this.metrics = new Metrics(micrometerMetrics);
}

@Override
Expand All @@ -66,6 +81,10 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) throws Exception {
Slf4jReporter.forRegistry(rifFileEvent.getEventMetrics()).outputTo(LOGGER).build();
dataSetFileMetricsReporter.start(2, TimeUnit.MINUTES);

final LongTaskTimer.Sample activeTimer = metrics.createActiveTimerForRif(rifFile).start();
final io.micrometer.core.instrument.Timer.Sample totalTimer =
io.micrometer.core.instrument.Timer.start();

try {
LOGGER.info("Processing file {}", rifFile.getDisplayName());
rifFile.markAsStarted();
Expand All @@ -82,6 +101,9 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) throws Exception {
failure = e;
}

activeTimer.stop();
totalTimer.stop(metrics.createTotalTimerForRif(rifFile));

dataSetFileMetricsReporter.stop();
dataSetFileMetricsReporter.report();

Expand All @@ -104,4 +126,110 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) throws Exception {
public void noDataAvailable() {
// Nothing to do here.
}

/** Metrics for the {@link DefaultDataSetMonitorListener}'s operations. */
@RequiredArgsConstructor
public static final class Metrics {
/**
* Name of the per-{@link RifFile} data processing {@link LongTaskTimer}s that actively, at each
* Micrometer reporting interval, records and reports the duration of processing of a given
* {@link RifFile}.
*
* @implNote We use the class name of {@link CcwRifLoadJob} as the metric prefix instead of
* {@link DefaultDataSetMonitorListener} as there are other CCW RIF-related metrics
* generated from the {@link CcwRifLoadJob}. Additionally, {@link
* DefaultDataSetMonitorListener} is indirectly invoked by {@link CcwRifLoadJob}
*/
public static final String RIF_FILE_PROCESSING_ACTIVE_TIMER_NAME =
String.format("%s.rif_file_processing.active", CcwRifLoadJob.class.getSimpleName());

/**
* Name of the per-{@link RifFile} data processing {@link Timer}s that report the final duration
* of processing once the {@link RifFile} is processed.
*
* @implNote We use the class name of {@link CcwRifLoadJob} as the metric prefix instead of
* {@link DefaultDataSetMonitorListener} as there are other CCW RIF-related metrics
* generated from the {@link CcwRifLoadJob}. Additionally, {@link
* DefaultDataSetMonitorListener} is indirectly invoked by {@link CcwRifLoadJob}
*/
public static final String RIF_FILE_PROCESSING_TOTAL_TIMER_NAME =
String.format("%s.rif_file_processing.total", CcwRifLoadJob.class.getSimpleName());

/**
* Tag indicating which data set (identified by its timestamp in S3) a given metric measured.
*/
private static final String TAG_DATA_SET_TIMESTAMP = "data_set_timestamp";

/** Tag indicating which RIF file a given metric measured. */
private static final String TAG_RIF_FILE = "rif_file";

/**
* Tag indicating whether the data load associated with the measured metric was synthetic or
* not.
*/
private static final String TAG_IS_SYNTHETIC = "is_synthetic";

/** Tag indicating which {@link DataSetManifest} was associated with the measured metric. */
private static final String TAG_MANIFEST = "manifest";

/** Micrometer {@link MeterRegistry} for the Pipeline application. */
private final MeterRegistry micrometerMetrics;

/**
* Creates a {@link LongTaskTimer} for a given {@link RifFile} so that the time it takes to
* process the RIF can be measured while processing is ongoing. Should be called prior to
* processing a {@link RifFile}.
*
* @param rifFile the {@link RifFile} to time
* @return the {@link LongTaskTimer} that will be used to measure the ongoing load time of the
* {@link RifFile}
*/
LongTaskTimer createActiveTimerForRif(RifFile rifFile) {
return LongTaskTimer.builder(RIF_FILE_PROCESSING_ACTIVE_TIMER_NAME)
.tags(getTags(rifFile))
.register(micrometerMetrics);
}

/**
* Creates a {@link io.micrometer.core.instrument.Timer} for a given {@link RifFile} so that the
* total time it takes to process the RIF can be recorded once a {@link RifFile} is done
* processing. Should be used with {@link
* io.micrometer.core.instrument.Timer.Sample#stop(io.micrometer.core.instrument.Timer)} after
* processing a {@link RifFile} to record the total duration.
*
* @param rifFile the {@link RifFile} to time
* @return the {@link LongTaskTimer} that will be used to measure the total time taken to load
* the {@link RifFile}
*/
io.micrometer.core.instrument.Timer createTotalTimerForRif(RifFile rifFile) {
return io.micrometer.core.instrument.Timer.builder(RIF_FILE_PROCESSING_TOTAL_TIMER_NAME)
.tags(getTags(rifFile))
.register(micrometerMetrics);
}

/**
* Returns a {@link List} of default {@link Tag}s that is used to disambiguate a given metric
* based on its corresponding {@link DataSetManifest}.
*
* @param rifFile {@link RifFile} from which several properties will be used to set relevant
* {@link Tag}s
* @return a {@link List} of {@link Tag}s including relevant information from {@code rifFile}
*/
private List<Tag> getTags(RifFile rifFile) {
final var rifFileTag = Tag.of(TAG_RIF_FILE, rifFile.getFileType().name().toLowerCase());
if (rifFile instanceof S3RifFile s3RifFile) {
final var manifest = s3RifFile.getManifestEntry().getParentManifest();
final var manifestFullpath = manifest.getIncomingS3Key();
final var manifestFilename =
manifestFullpath.substring(manifestFullpath.lastIndexOf("/") + 1);
return List.of(
Tag.of(TAG_DATA_SET_TIMESTAMP, manifest.getTimestampText()),
Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(manifest.isSyntheticData())),
rifFileTag,
Tag.of(TAG_MANIFEST, manifestFilename));
}

return List.of(rifFileTag);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,8 @@ private PipelineJob createCcwRifLoadJob(
* each data set that is found.
*/
DataSetMonitorListener dataSetMonitorListener =
new DefaultDataSetMonitorListener(appState.getMetrics(), rifProcessor, rifLoader);
new DefaultDataSetMonitorListener(
appState.getMetrics(), appState.getMeters(), rifProcessor, rifLoader);
var s3Factory = new AwsS3ClientFactory(loadOptions.getExtractionOptions().getS3ClientConfig());
// Tell SQ it's ok not to use try-finally here since this will be closed by the CcwRifLoadJob.
@SuppressWarnings("java:S2095")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome;
import gov.cms.bfd.pipeline.sharedutils.PipelineJobSchedule;
import gov.cms.bfd.sharedutils.exceptions.BadCodeMonkeyException;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -31,6 +35,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import lombok.RequiredArgsConstructor;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -157,6 +162,9 @@ public final class CcwRifLoadJob implements PipelineJob {
/** The application metrics. */
private final MetricRegistry appMetrics;

/** Metrics for operations within this job. */
private final Metrics loadJobMetrics;

/** The extraction options. */
private final ExtractionOptions options;

Expand Down Expand Up @@ -210,6 +218,7 @@ public CcwRifLoadJob(
this.runInterval = runInterval;
this.statusReporter = statusReporter;
downloadService = Executors.newSingleThreadScheduledExecutor();
loadJobMetrics = new Metrics(appState.getMeters());
}

@Override
Expand Down Expand Up @@ -348,11 +357,16 @@ public PipelineJobOutcome call() throws Exception {
* processing multiple data sets in parallel (which would lead to data
* consistency problems).
*/
final var activeTimer =
loadJobMetrics.createActiveTimerForManifest(manifestToProcess).start();
final var totalTimer = Timer.start();
statusReporter.reportProcessingManifestData(manifestToProcess.getIncomingS3Key());
dataSetQueue.markAsStarted(manifestRecord);
listener.dataAvailable(rifFilesEvent);
statusReporter.reportCompletedManifest(manifestToProcess.getIncomingS3Key());
dataSetQueue.markAsProcessed(manifestRecord);
activeTimer.stop();
totalTimer.stop(loadJobMetrics.createTotalTimerForManifest(manifestToProcess));
LOGGER.info(LOG_MESSAGE_DATA_SET_COMPLETE);

/*
Expand Down Expand Up @@ -527,4 +541,92 @@ private boolean isProcessingRequired(S3DataFile dataFileRecord) {
dataFileRecord.getFileName());
return false;
}

/** Micrometer metrics and helpers for measuring {@link CcwRifLoadJob} operations. */
@RequiredArgsConstructor
public static final class Metrics {

/**
* Name of the per-{@link DataSetManifest} {@link LongTaskTimer}s that actively, at each
* Micrometer reporting interval, records and reports the duration of processing of a given
* {@link DataSetManifest}.
*/
public static final String MANIFEST_PROCESSING_ACTIVE_TIMER_NAME =
String.format("%s.manifest_processing.active", CcwRifLoadJob.class.getSimpleName());

/**
* Name of the per-{@link DataSetManifest} {@link Timer}s that report the final duration of
* processing once the {@link DataSetManifest} is processed.
*/
public static final String MANIFEST_PROCESSING_TOTAL_TIMER_NAME =
String.format("%s.manifest_processing.total", CcwRifLoadJob.class.getSimpleName());

/**
* Tag indicating which data set (identified by its timestamp in S3) a given metric measured.
*/
private static final String TAG_DATA_SET_TIMESTAMP = "data_set_timestamp";

/**
* Tag indicating whether the data load associated with the measured metric was synthetic or
* not.
*/
private static final String TAG_IS_SYNTHETIC = "is_synthetic";

/** Tag indicating which {@link DataSetManifest} was associated with the measured metric. */
private static final String TAG_MANIFEST = "manifest";

/** Micrometer {@link MeterRegistry} for the Pipeline application. */
private final MeterRegistry appMetrics;

/**
* Creates a {@link LongTaskTimer} for a given {@link DataSetManifest} so that the time it takes
* to process the manifest can be measured and recorded while processing is ongoing. Should be
* called prior to processing a {@link DataSetManifest}.
*
* @param manifest the {@link DataSetManifest} to time
* @return the {@link LongTaskTimer} that will be used to actively measure and record the time
* taken to load the {@link DataSetManifest}
*/
LongTaskTimer createActiveTimerForManifest(DataSetManifest manifest) {
return LongTaskTimer.builder(MANIFEST_PROCESSING_ACTIVE_TIMER_NAME)
.tags(getTags(manifest))
.register(appMetrics);
}

/**
* Creates a {@link Timer} for a given {@link DataSetManifest} so that the total time it takes
* to process the manifest can be recorded. Should be used with {@link Timer.Sample#stop(Timer)}
* after processing a {@link DataSetManifest} to record the total duration.
*
* @param manifest the {@link DataSetManifest} to time
* @return the {@link LongTaskTimer} that will be used to record the total time taken to load
* the {@link DataSetManifest}
*/
Timer createTotalTimerForManifest(DataSetManifest manifest) {
return Timer.builder(MANIFEST_PROCESSING_TOTAL_TIMER_NAME)
.tags(getTags(manifest))
.register(appMetrics);
}

/**
* Returns a {@link List} of default {@link Tag}s that is used to disambiguate a given metric
* based on its corresponding {@link DataSetManifest}.
*
* @param manifest {@link DataSetManifest} from which the values of {@link
* DataSetManifest#getTimestampText()}, {@link DataSetManifest#isSyntheticData()} and {@link
* DataSetManifest#getIncomingS3Key()} will be used to set the {@link
* #TAG_DATA_SET_TIMESTAMP}, {@link #TAG_IS_SYNTHETIC} and {@link #TAG_MANIFEST} {@link
* Tag}s, respectively
* @return a {@link List} of {@link Tag}s including relevant information from {@code manifest}
*/
private List<Tag> getTags(DataSetManifest manifest) {
final var manifestFullpath = manifest.getIncomingS3Key();
final var manifestFilename =
manifestFullpath.substring(manifestFullpath.lastIndexOf("/") + 1);
return List.of(
Tag.of(TAG_DATA_SET_TIMESTAMP, manifest.getTimestampText()),
Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(manifest.isSyntheticData())),
Tag.of(TAG_MANIFEST, manifestFilename));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,7 +33,7 @@ public final class S3RifFile implements RifFile {
private final MetricRegistry appMetrics;

/** The manifest data. */
private final DataSetManifestEntry manifestEntry;
@Getter private final DataSetManifestEntry manifestEntry;

/** The manifest download result. */
private final Future<DataSetQueue.ManifestEntry> manifestEntryDownload;
Expand Down