diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/AppConfiguration.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/AppConfiguration.java index 09fbe7ede5..be3aaa1fc8 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/AppConfiguration.java +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/AppConfiguration.java @@ -5,6 +5,7 @@ import gov.cms.bfd.model.rda.MessageError; import gov.cms.bfd.model.rif.RifFileType; import gov.cms.bfd.model.rif.RifRecordEvent; +import gov.cms.bfd.pipeline.ccw.rif.CcwRifLoadJob; import gov.cms.bfd.pipeline.ccw.rif.CcwRifLoadOptions; import gov.cms.bfd.pipeline.ccw.rif.extract.ExtractionOptions; import gov.cms.bfd.pipeline.ccw.rif.extract.s3.DataSetManifest; @@ -373,7 +374,13 @@ public final class AppConfiguration extends BaseAppConfiguration { * auto-generated aggregate metric names with suffixes like {@code .avg}. */ public static final Set MICROMETER_CW_ALLOWED_METRIC_NAMES = - Set.of("FissClaimRdaSink.change.latency.millis", "McsClaimRdaSink.change.latency.millis"); + Set.of( + "FissClaimRdaSink.change.latency.millis", + "McsClaimRdaSink.change.latency.millis", + CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_ACTIVE_TIMER_NAME, + CcwRifLoadJob.Metrics.MANIFEST_PROCESSING_TOTAL_TIMER_NAME, + DefaultDataSetMonitorListener.Metrics.RIF_FILE_PROCESSING_ACTIVE_TIMER_NAME, + DefaultDataSetMonitorListener.Metrics.RIF_FILE_PROCESSING_TOTAL_TIMER_NAME); /** * The CCW rif load options. This can be null if the CCW job is not configured, Optional is not diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/DefaultDataSetMonitorListener.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/DefaultDataSetMonitorListener.java index e0acda9a56..a9aab56e42 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/DefaultDataSetMonitorListener.java +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/DefaultDataSetMonitorListener.java @@ -9,9 +9,16 @@ import gov.cms.bfd.pipeline.ccw.rif.CcwRifLoadJob; import gov.cms.bfd.pipeline.ccw.rif.extract.RifFileRecords; import gov.cms.bfd.pipeline.ccw.rif.extract.RifFilesProcessor; +import gov.cms.bfd.pipeline.ccw.rif.extract.s3.DataSetManifest; import gov.cms.bfd.pipeline.ccw.rif.extract.s3.DataSetMonitorListener; +import gov.cms.bfd.pipeline.ccw.rif.extract.s3.S3RifFile; import gov.cms.bfd.pipeline.ccw.rif.load.RifLoader; +import io.micrometer.core.instrument.LongTaskTimer; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import java.util.List; import java.util.concurrent.TimeUnit; +import lombok.RequiredArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +37,9 @@ public final class DefaultDataSetMonitorListener implements DataSetMonitorListen /** Metrics for this class. */ private final MetricRegistry appMetrics; + /** Micrometer metrics for this class. */ + private final Metrics metrics; + /** Handles processing of new RIF files. */ private final RifFilesProcessor rifProcessor; @@ -40,14 +50,19 @@ public final class DefaultDataSetMonitorListener implements DataSetMonitorListen * Initializes the instance. * * @param appMetrics the {@link MetricRegistry} for the application + * @param micrometerMetrics the {@link MeterRegistry} for the application * @param rifProcessor the {@link RifFilesProcessor} for the application * @param rifLoader the {@link RifLoader} for the application */ DefaultDataSetMonitorListener( - MetricRegistry appMetrics, RifFilesProcessor rifProcessor, RifLoader rifLoader) { + MetricRegistry appMetrics, + MeterRegistry micrometerMetrics, + RifFilesProcessor rifProcessor, + RifLoader rifLoader) { this.appMetrics = appMetrics; this.rifProcessor = rifProcessor; this.rifLoader = rifLoader; + this.metrics = new Metrics(micrometerMetrics); } @Override @@ -66,6 +81,10 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) throws Exception { Slf4jReporter.forRegistry(rifFileEvent.getEventMetrics()).outputTo(LOGGER).build(); dataSetFileMetricsReporter.start(2, TimeUnit.MINUTES); + final LongTaskTimer.Sample activeTimer = metrics.createActiveTimerForRif(rifFile).start(); + final io.micrometer.core.instrument.Timer.Sample totalTimer = + io.micrometer.core.instrument.Timer.start(); + try { LOGGER.info("Processing file {}", rifFile.getDisplayName()); rifFile.markAsStarted(); @@ -82,6 +101,9 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) throws Exception { failure = e; } + activeTimer.stop(); + totalTimer.stop(metrics.createTotalTimerForRif(rifFile)); + dataSetFileMetricsReporter.stop(); dataSetFileMetricsReporter.report(); @@ -104,4 +126,110 @@ public void dataAvailable(RifFilesEvent rifFilesEvent) throws Exception { public void noDataAvailable() { // Nothing to do here. } + + /** Metrics for the {@link DefaultDataSetMonitorListener}'s operations. */ + @RequiredArgsConstructor + public static final class Metrics { + /** + * Name of the per-{@link RifFile} data processing {@link LongTaskTimer}s that actively, at each + * Micrometer reporting interval, records and reports the duration of processing of a given + * {@link RifFile}. + * + * @implNote We use the class name of {@link CcwRifLoadJob} as the metric prefix instead of + * {@link DefaultDataSetMonitorListener} as there are other CCW RIF-related metrics + * generated from the {@link CcwRifLoadJob}. Additionally, {@link + * DefaultDataSetMonitorListener} is indirectly invoked by {@link CcwRifLoadJob} + */ + public static final String RIF_FILE_PROCESSING_ACTIVE_TIMER_NAME = + String.format("%s.rif_file_processing.active", CcwRifLoadJob.class.getSimpleName()); + + /** + * Name of the per-{@link RifFile} data processing {@link Timer}s that report the final duration + * of processing once the {@link RifFile} is processed. + * + * @implNote We use the class name of {@link CcwRifLoadJob} as the metric prefix instead of + * {@link DefaultDataSetMonitorListener} as there are other CCW RIF-related metrics + * generated from the {@link CcwRifLoadJob}. Additionally, {@link + * DefaultDataSetMonitorListener} is indirectly invoked by {@link CcwRifLoadJob} + */ + public static final String RIF_FILE_PROCESSING_TOTAL_TIMER_NAME = + String.format("%s.rif_file_processing.total", CcwRifLoadJob.class.getSimpleName()); + + /** + * Tag indicating which data set (identified by its timestamp in S3) a given metric measured. + */ + private static final String TAG_DATA_SET_TIMESTAMP = "data_set_timestamp"; + + /** Tag indicating which RIF file a given metric measured. */ + private static final String TAG_RIF_FILE = "rif_file"; + + /** + * Tag indicating whether the data load associated with the measured metric was synthetic or + * not. + */ + private static final String TAG_IS_SYNTHETIC = "is_synthetic"; + + /** Tag indicating which {@link DataSetManifest} was associated with the measured metric. */ + private static final String TAG_MANIFEST = "manifest"; + + /** Micrometer {@link MeterRegistry} for the Pipeline application. */ + private final MeterRegistry micrometerMetrics; + + /** + * Creates a {@link LongTaskTimer} for a given {@link RifFile} so that the time it takes to + * process the RIF can be measured while processing is ongoing. Should be called prior to + * processing a {@link RifFile}. + * + * @param rifFile the {@link RifFile} to time + * @return the {@link LongTaskTimer} that will be used to measure the ongoing load time of the + * {@link RifFile} + */ + LongTaskTimer createActiveTimerForRif(RifFile rifFile) { + return LongTaskTimer.builder(RIF_FILE_PROCESSING_ACTIVE_TIMER_NAME) + .tags(getTags(rifFile)) + .register(micrometerMetrics); + } + + /** + * Creates a {@link io.micrometer.core.instrument.Timer} for a given {@link RifFile} so that the + * total time it takes to process the RIF can be recorded once a {@link RifFile} is done + * processing. Should be used with {@link + * io.micrometer.core.instrument.Timer.Sample#stop(io.micrometer.core.instrument.Timer)} after + * processing a {@link RifFile} to record the total duration. + * + * @param rifFile the {@link RifFile} to time + * @return the {@link LongTaskTimer} that will be used to measure the total time taken to load + * the {@link RifFile} + */ + io.micrometer.core.instrument.Timer createTotalTimerForRif(RifFile rifFile) { + return io.micrometer.core.instrument.Timer.builder(RIF_FILE_PROCESSING_TOTAL_TIMER_NAME) + .tags(getTags(rifFile)) + .register(micrometerMetrics); + } + + /** + * Returns a {@link List} of default {@link Tag}s that is used to disambiguate a given metric + * based on its corresponding {@link DataSetManifest}. + * + * @param rifFile {@link RifFile} from which several properties will be used to set relevant + * {@link Tag}s + * @return a {@link List} of {@link Tag}s including relevant information from {@code rifFile} + */ + private List getTags(RifFile rifFile) { + final var rifFileTag = Tag.of(TAG_RIF_FILE, rifFile.getFileType().name().toLowerCase()); + if (rifFile instanceof S3RifFile s3RifFile) { + final var manifest = s3RifFile.getManifestEntry().getParentManifest(); + final var manifestFullpath = manifest.getIncomingS3Key(); + final var manifestFilename = + manifestFullpath.substring(manifestFullpath.lastIndexOf("/") + 1); + return List.of( + Tag.of(TAG_DATA_SET_TIMESTAMP, manifest.getTimestampText()), + Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(manifest.isSyntheticData())), + rifFileTag, + Tag.of(TAG_MANIFEST, manifestFilename)); + } + + return List.of(rifFileTag); + } + } } diff --git a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineApplication.java b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineApplication.java index f2854163e3..270b96ec92 100644 --- a/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineApplication.java +++ b/apps/bfd-pipeline/bfd-pipeline-app/src/main/java/gov/cms/bfd/pipeline/app/PipelineApplication.java @@ -537,7 +537,8 @@ private PipelineJob createCcwRifLoadJob( * each data set that is found. */ DataSetMonitorListener dataSetMonitorListener = - new DefaultDataSetMonitorListener(appState.getMetrics(), rifProcessor, rifLoader); + new DefaultDataSetMonitorListener( + appState.getMetrics(), appState.getMeters(), rifProcessor, rifLoader); var s3Factory = new AwsS3ClientFactory(loadOptions.getExtractionOptions().getS3ClientConfig()); // Tell SQ it's ok not to use try-finally here since this will be closed by the CcwRifLoadJob. @SuppressWarnings("java:S2095") diff --git a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJob.java b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJob.java index a88cc2ef4c..5b8eea8224 100644 --- a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJob.java +++ b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/CcwRifLoadJob.java @@ -18,6 +18,10 @@ import gov.cms.bfd.pipeline.sharedutils.PipelineJobOutcome; import gov.cms.bfd.pipeline.sharedutils.PipelineJobSchedule; import gov.cms.bfd.sharedutils.exceptions.BadCodeMonkeyException; +import io.micrometer.core.instrument.LongTaskTimer; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Timer; import java.io.IOException; import java.time.Duration; import java.time.Instant; @@ -31,6 +35,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import lombok.RequiredArgsConstructor; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -157,6 +162,9 @@ public final class CcwRifLoadJob implements PipelineJob { /** The application metrics. */ private final MetricRegistry appMetrics; + /** Metrics for operations within this job. */ + private final Metrics loadJobMetrics; + /** The extraction options. */ private final ExtractionOptions options; @@ -210,6 +218,7 @@ public CcwRifLoadJob( this.runInterval = runInterval; this.statusReporter = statusReporter; downloadService = Executors.newSingleThreadScheduledExecutor(); + loadJobMetrics = new Metrics(appState.getMeters()); } @Override @@ -348,11 +357,16 @@ public PipelineJobOutcome call() throws Exception { * processing multiple data sets in parallel (which would lead to data * consistency problems). */ + final var activeTimer = + loadJobMetrics.createActiveTimerForManifest(manifestToProcess).start(); + final var totalTimer = Timer.start(); statusReporter.reportProcessingManifestData(manifestToProcess.getIncomingS3Key()); dataSetQueue.markAsStarted(manifestRecord); listener.dataAvailable(rifFilesEvent); statusReporter.reportCompletedManifest(manifestToProcess.getIncomingS3Key()); dataSetQueue.markAsProcessed(manifestRecord); + activeTimer.stop(); + totalTimer.stop(loadJobMetrics.createTotalTimerForManifest(manifestToProcess)); LOGGER.info(LOG_MESSAGE_DATA_SET_COMPLETE); /* @@ -527,4 +541,92 @@ private boolean isProcessingRequired(S3DataFile dataFileRecord) { dataFileRecord.getFileName()); return false; } + + /** Micrometer metrics and helpers for measuring {@link CcwRifLoadJob} operations. */ + @RequiredArgsConstructor + public static final class Metrics { + + /** + * Name of the per-{@link DataSetManifest} {@link LongTaskTimer}s that actively, at each + * Micrometer reporting interval, records and reports the duration of processing of a given + * {@link DataSetManifest}. + */ + public static final String MANIFEST_PROCESSING_ACTIVE_TIMER_NAME = + String.format("%s.manifest_processing.active", CcwRifLoadJob.class.getSimpleName()); + + /** + * Name of the per-{@link DataSetManifest} {@link Timer}s that report the final duration of + * processing once the {@link DataSetManifest} is processed. + */ + public static final String MANIFEST_PROCESSING_TOTAL_TIMER_NAME = + String.format("%s.manifest_processing.total", CcwRifLoadJob.class.getSimpleName()); + + /** + * Tag indicating which data set (identified by its timestamp in S3) a given metric measured. + */ + private static final String TAG_DATA_SET_TIMESTAMP = "data_set_timestamp"; + + /** + * Tag indicating whether the data load associated with the measured metric was synthetic or + * not. + */ + private static final String TAG_IS_SYNTHETIC = "is_synthetic"; + + /** Tag indicating which {@link DataSetManifest} was associated with the measured metric. */ + private static final String TAG_MANIFEST = "manifest"; + + /** Micrometer {@link MeterRegistry} for the Pipeline application. */ + private final MeterRegistry appMetrics; + + /** + * Creates a {@link LongTaskTimer} for a given {@link DataSetManifest} so that the time it takes + * to process the manifest can be measured and recorded while processing is ongoing. Should be + * called prior to processing a {@link DataSetManifest}. + * + * @param manifest the {@link DataSetManifest} to time + * @return the {@link LongTaskTimer} that will be used to actively measure and record the time + * taken to load the {@link DataSetManifest} + */ + LongTaskTimer createActiveTimerForManifest(DataSetManifest manifest) { + return LongTaskTimer.builder(MANIFEST_PROCESSING_ACTIVE_TIMER_NAME) + .tags(getTags(manifest)) + .register(appMetrics); + } + + /** + * Creates a {@link Timer} for a given {@link DataSetManifest} so that the total time it takes + * to process the manifest can be recorded. Should be used with {@link Timer.Sample#stop(Timer)} + * after processing a {@link DataSetManifest} to record the total duration. + * + * @param manifest the {@link DataSetManifest} to time + * @return the {@link LongTaskTimer} that will be used to record the total time taken to load + * the {@link DataSetManifest} + */ + Timer createTotalTimerForManifest(DataSetManifest manifest) { + return Timer.builder(MANIFEST_PROCESSING_TOTAL_TIMER_NAME) + .tags(getTags(manifest)) + .register(appMetrics); + } + + /** + * Returns a {@link List} of default {@link Tag}s that is used to disambiguate a given metric + * based on its corresponding {@link DataSetManifest}. + * + * @param manifest {@link DataSetManifest} from which the values of {@link + * DataSetManifest#getTimestampText()}, {@link DataSetManifest#isSyntheticData()} and {@link + * DataSetManifest#getIncomingS3Key()} will be used to set the {@link + * #TAG_DATA_SET_TIMESTAMP}, {@link #TAG_IS_SYNTHETIC} and {@link #TAG_MANIFEST} {@link + * Tag}s, respectively + * @return a {@link List} of {@link Tag}s including relevant information from {@code manifest} + */ + private List getTags(DataSetManifest manifest) { + final var manifestFullpath = manifest.getIncomingS3Key(); + final var manifestFilename = + manifestFullpath.substring(manifestFullpath.lastIndexOf("/") + 1); + return List.of( + Tag.of(TAG_DATA_SET_TIMESTAMP, manifest.getTimestampText()), + Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(manifest.isSyntheticData())), + Tag.of(TAG_MANIFEST, manifestFilename)); + } + } } diff --git a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/extract/s3/S3RifFile.java b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/extract/s3/S3RifFile.java index 890f4d7e85..08e0b85a1a 100644 --- a/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/extract/s3/S3RifFile.java +++ b/apps/bfd-pipeline/bfd-pipeline-ccw-rif/src/main/java/gov/cms/bfd/pipeline/ccw/rif/extract/s3/S3RifFile.java @@ -18,6 +18,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import lombok.Getter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,7 @@ public final class S3RifFile implements RifFile { private final MetricRegistry appMetrics; /** The manifest data. */ - private final DataSetManifestEntry manifestEntry; + @Getter private final DataSetManifestEntry manifestEntry; /** The manifest download result. */ private final Future manifestEntryDownload;