From 58930f9ea30845e33be99985c05a7dab0b1de9d4 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Fri, 29 Nov 2024 09:49:52 -0300 Subject: [PATCH] Fix data race by using a state pool Signed-off-by: Arthur Silva Sens --- .../prometheusremotewriteexporter/exporter.go | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 74e1cff42b79..90e0e397607e 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -70,21 +70,25 @@ var bufferPool = sync.Pool{ // prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint. type prwExporter struct { - endpointURL *url.URL - client *http.Client - wg *sync.WaitGroup - closeChan chan struct{} - concurrency int - userAgentHeader string - maxBatchSizeBytes int - clientSettings *confighttp.ClientConfig - settings component.TelemetrySettings - retrySettings configretry.BackOffConfig - retryOnHTTP429 bool - wal *prweWAL - exporterSettings prometheusremotewrite.Settings - telemetry prwTelemetry - batchTimeSeriesState batchTimeSeriesState + endpointURL *url.URL + client *http.Client + wg *sync.WaitGroup + closeChan chan struct{} + concurrency int + userAgentHeader string + maxBatchSizeBytes int + clientSettings *confighttp.ClientConfig + settings component.TelemetrySettings + retrySettings configretry.BackOffConfig + retryOnHTTP429 bool + wal *prweWAL + exporterSettings prometheusremotewrite.Settings + telemetry prwTelemetry + + // When concurrency is enabled, concurrent goroutines would potentially + // fight over the same batchState object. To avoid this, we use a pool + // to provide each goroutine with its own state. + batchStatePool sync.Pool } func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) { @@ -140,7 +144,7 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) { SendMetadata: cfg.SendMetadata, }, telemetry: prwTelemetry, - batchTimeSeriesState: newBatchTimeSericesState(), + batchStatePool: sync.Pool{New: func() any { return newBatchTimeSericesState() }}, } if prwe.exporterSettings.ExportCreatedMetric { @@ -228,8 +232,10 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro return nil } + state := prwe.batchStatePool.Get().(batchTimeSeriesState) + defer prwe.batchStatePool.Put(state) // Calls the helper function to convert and batch the TsMap to the desired format - requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState) + requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &state) if err != nil { return err }