Skip to content

Commit

Permalink
Fix data race by using a state pool
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Silva Sens <[email protected]>
  • Loading branch information
ArthurSens committed Nov 29, 2024
1 parent f54684c commit 58930f9
Showing 1 changed file with 23 additions and 17 deletions.
40 changes: 23 additions & 17 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,25 @@ var bufferPool = sync.Pool{

// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type prwExporter struct {
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
maxBatchSizeBytes int
clientSettings *confighttp.ClientConfig
settings component.TelemetrySettings
retrySettings configretry.BackOffConfig
retryOnHTTP429 bool
wal *prweWAL
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry
batchTimeSeriesState batchTimeSeriesState
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
maxBatchSizeBytes int
clientSettings *confighttp.ClientConfig
settings component.TelemetrySettings
retrySettings configretry.BackOffConfig
retryOnHTTP429 bool
wal *prweWAL
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry

// When concurrency is enabled, concurrent goroutines would potentially
// fight over the same batchState object. To avoid this, we use a pool
// to provide each goroutine with its own state.
batchStatePool sync.Pool
}

func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) {
Expand Down Expand Up @@ -140,7 +144,7 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
SendMetadata: cfg.SendMetadata,
},
telemetry: prwTelemetry,
batchTimeSeriesState: newBatchTimeSericesState(),
batchStatePool: sync.Pool{New: func() any { return newBatchTimeSericesState() }},
}

if prwe.exporterSettings.ExportCreatedMetric {
Expand Down Expand Up @@ -228,8 +232,10 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro
return nil
}

state := prwe.batchStatePool.Get().(batchTimeSeriesState)
defer prwe.batchStatePool.Put(state)
// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState)
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &state)
if err != nil {
return err
}
Expand Down

0 comments on commit 58930f9

Please sign in to comment.