Skip to content

Commit

Permalink
[exporter]prometheusremotewrite] Fix data race by introducing pool of…
Browse files Browse the repository at this point in the history
… batch state (open-telemetry#36601)

#### Description
This is an alternative for
open-telemetry#36524
and open-telemetry#36600

This PR does a couple of things:

* Add a test written by @edma2 that shows a data race to the batch state
when running multiple consumers.
* Add a benchmark for PushMetrics, with options to run with a stable
number of metrics or varying metrics.
* Fix the data race by introducing a `sync.Pool` of batch states.

#### Benchmark results

results comparing `main`,
open-telemetry#36600
and this PR:

```console
arthursens$ benchstat main.txt withoutState.txt syncpool.txt 
goos: darwin
goarch: arm64
pkg: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter
cpu: Apple M2 Pro
                            │  main.txt   │          withoutState.txt           │         syncpool.txt         │
                            │   sec/op    │    sec/op     vs base               │   sec/op     vs base         │
PushMetricsVaryingMetrics-2   8.066m ± 5%   13.821m ± 9%  +71.36% (p=0.002 n=6)   8.316m ± 6%  ~ (p=0.065 n=6)

                            │   main.txt   │           withoutState.txt            │            syncpool.txt            │
                            │     B/op     │     B/op       vs base                │     B/op      vs base              │
PushMetricsVaryingMetrics-2   5.216Mi ± 0%   34.436Mi ± 0%  +560.17% (p=0.002 n=6)   5.548Mi ± 0%  +6.36% (p=0.002 n=6)

                            │  main.txt   │       withoutState.txt       │         syncpool.txt         │
                            │  allocs/op  │  allocs/op   vs base         │  allocs/op   vs base         │
PushMetricsVaryingMetrics-2   56.02k ± 0%   56.05k ± 0%  ~ (p=0.721 n=6)   56.04k ± 0%  ~ (p=0.665 n=6)
```

---------

Signed-off-by: Arthur Silva Sens <[email protected]>
  • Loading branch information
ArthurSens authored and AkhigbeEromo committed Jan 13, 2025
1 parent 1b1e5c2 commit 9d0297b
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 23 deletions.
27 changes: 27 additions & 0 deletions .chloggen/prwexporter-syncpool-batchstate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/prometheusremotewrite

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: In preparation to re-introducing multiple workers, we're removing a data-race when batching timeseries.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36601]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
42 changes: 24 additions & 18 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,25 @@ var bufferPool = sync.Pool{

// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type prwExporter struct {
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
maxBatchSizeBytes int
clientSettings *confighttp.ClientConfig
settings component.TelemetrySettings
retrySettings configretry.BackOffConfig
retryOnHTTP429 bool
wal *prweWAL
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry
batchTimeSeriesState batchTimeSeriesState
endpointURL *url.URL
client *http.Client
wg *sync.WaitGroup
closeChan chan struct{}
concurrency int
userAgentHeader string
maxBatchSizeBytes int
clientSettings *confighttp.ClientConfig
settings component.TelemetrySettings
retrySettings configretry.BackOffConfig
retryOnHTTP429 bool
wal *prweWAL
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry

// When concurrency is enabled, concurrent goroutines would potentially
// fight over the same batchState object. To avoid this, we use a pool
// to provide each goroutine with its own state.
batchStatePool sync.Pool
}

func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) {
Expand Down Expand Up @@ -139,8 +143,8 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
AddMetricSuffixes: cfg.AddMetricSuffixes,
SendMetadata: cfg.SendMetadata,
},
telemetry: prwTelemetry,
batchTimeSeriesState: newBatchTimeSericesState(),
telemetry: prwTelemetry,
batchStatePool: sync.Pool{New: func() any { return newBatchTimeSericesState() }},
}

if prwe.exporterSettings.ExportCreatedMetric {
Expand Down Expand Up @@ -228,8 +232,10 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro
return nil
}

state := prwe.batchStatePool.Get().(*batchTimeSeriesState)
defer prwe.batchStatePool.Put(state)
// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState)
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, state)
if err != nil {
return err
}
Expand Down
138 changes: 138 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_concurrency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewriteexporter

import (
"context"
"io"
"net/http"
"net/http/httptest"
"strconv"
"sync"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
)

// Test everything works when there is more than one goroutine calling PushMetrics.
// Today we only use 1 worker per exporter, but the intention of this test is to future-proof in case it changes.
func Test_PushMetricsConcurrent(t *testing.T) {
n := 1000
ms := make([]pmetric.Metrics, n)
testIDKey := "test_id"
for i := 0; i < n; i++ {
m := testdata.GenerateMetricsOneMetric()
dps := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints()
for j := 0; j < dps.Len(); j++ {
dp := dps.At(j)
dp.Attributes().PutInt(testIDKey, int64(i))
}
ms[i] = m
}
received := make(map[int]prompb.TimeSeries)
var mu sync.Mutex

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
assert.NotNil(t, body)
// Receives the http requests and unzip, unmarshalls, and extracts TimeSeries
assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version"))
assert.Equal(t, "snappy", r.Header.Get("Content-Encoding"))
var unzipped []byte

dest, err := snappy.Decode(unzipped, body)
assert.NoError(t, err)

wr := &prompb.WriteRequest{}
ok := proto.Unmarshal(dest, wr)
assert.NoError(t, ok)
assert.Len(t, wr.Timeseries, 2)
ts := wr.Timeseries[0]
foundLabel := false
for _, label := range ts.Labels {
if label.Name == testIDKey {
id, err := strconv.Atoi(label.Value)
assert.NoError(t, err)
mu.Lock()
_, ok := received[id]
assert.False(t, ok) // fail if we already saw it
received[id] = ts
mu.Unlock()
foundLabel = true
break
}
}
assert.True(t, foundLabel)
w.WriteHeader(http.StatusOK)
}))

defer server.Close()

// Adjusted retry settings for faster testing
retrySettings := configretry.BackOffConfig{
Enabled: true,
InitialInterval: 100 * time.Millisecond, // Shorter initial interval
MaxInterval: 1 * time.Second, // Shorter max interval
MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time
}
clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = server.URL
clientConfig.ReadBufferSize = 0
clientConfig.WriteBufferSize = 512 * 1024
cfg := &Config{
Namespace: "",
ClientConfig: clientConfig,
MaxBatchSizeBytes: 3000000,
RemoteWriteQueue: RemoteWriteQueue{NumConsumers: 1},
TargetInfo: &TargetInfo{
Enabled: true,
},
CreatedMetric: &CreatedMetric{
Enabled: false,
},
BackOffConfig: retrySettings,
}

assert.NotNil(t, cfg)
set := exportertest.NewNopSettings()
set.MetricsLevel = configtelemetry.LevelBasic

prwe, nErr := newPRWExporter(cfg, set)

require.NoError(t, nErr)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost()))
defer func() {
require.NoError(t, prwe.Shutdown(ctx))
}()

var wg sync.WaitGroup
wg.Add(n)
for _, m := range ms {
go func() {
err := prwe.PushMetrics(ctx, m)
assert.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
assert.Len(t, received, n)
}
78 changes: 78 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"math"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -1269,3 +1270,80 @@ func benchmarkExecute(b *testing.B, numSample int) {
require.NoError(b, err)
}
}

func BenchmarkPushMetrics(b *testing.B) {
for _, numMetrics := range []int{10, 100, 1000, 10000} {
b.Run(fmt.Sprintf("numMetrics=%d", numMetrics), func(b *testing.B) {
benchmarkPushMetrics(b, numMetrics, 1)
})
}
}

func BenchmarkPushMetricsVaryingMetrics(b *testing.B) {
benchmarkPushMetrics(b, -1, 1)
}

// benchmarkPushMetrics benchmarks the PushMetrics method with a given number of metrics.
// If numMetrics is -1, it will benchmark with varying number of metrics, from 10 up to 10000.
func benchmarkPushMetrics(b *testing.B, numMetrics, numConsumers int) {
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer mockServer.Close()
endpointURL, err := url.Parse(mockServer.URL)
require.NoError(b, err)

tel := setupTestTelemetry()
set := tel.NewSettings()
// Adjusted retry settings for faster testing
retrySettings := configretry.BackOffConfig{
Enabled: true,
InitialInterval: 100 * time.Millisecond, // Shorter initial interval
MaxInterval: 1 * time.Second, // Shorter max interval
MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time
}
clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = endpointURL.String()
clientConfig.ReadBufferSize = 0
clientConfig.WriteBufferSize = 512 * 1024
cfg := &Config{
Namespace: "",
ClientConfig: clientConfig,
MaxBatchSizeBytes: 3000,
RemoteWriteQueue: RemoteWriteQueue{NumConsumers: numConsumers},
BackOffConfig: retrySettings,
TargetInfo: &TargetInfo{Enabled: true},
CreatedMetric: &CreatedMetric{Enabled: false},
}
exporter, err := newPRWExporter(cfg, set)
require.NoError(b, err)

var metrics []pmetric.Metrics
for n := 0; n < b.N; n++ {
actualNumMetrics := numMetrics
if numMetrics == -1 {
actualNumMetrics = int(math.Pow(10, float64(n%4+1)))
}
m := testdata.GenerateMetricsManyMetricsSameResource(actualNumMetrics)
for i := 0; i < m.MetricCount(); i++ {
dp := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(i).Sum().DataPoints().AppendEmpty()
dp.SetIntValue(int64(i))
// We add a random key to the attributes to ensure that we create a new time series during translation for each metric.
dp.Attributes().PutInt("random_key", int64(i))
}
metrics = append(metrics, m)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost()))
defer func() {
require.NoError(b, exporter.Shutdown(ctx))
}()
b.ReportAllocs()
b.ResetTimer()
for _, m := range metrics {
err := exporter.PushMetrics(ctx, m)
require.NoError(b, err)
}
}
4 changes: 2 additions & 2 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type batchTimeSeriesState struct {
nextRequestBufferSize int
}

func newBatchTimeSericesState() batchTimeSeriesState {
return batchTimeSeriesState{
func newBatchTimeSericesState() *batchTimeSeriesState {
return &batchTimeSeriesState{
nextTimeSeriesBufferSize: math.MaxInt,
nextMetricMetadataBufferSize: math.MaxInt,
nextRequestBufferSize: 0,
Expand Down
6 changes: 3 additions & 3 deletions exporter/prometheusremotewriteexporter/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func Test_batchTimeSeries(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
state := newBatchTimeSericesState()
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, &state)
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, state)
if tt.returnErr {
assert.Error(t, err)
return
Expand Down Expand Up @@ -97,7 +97,7 @@ func Test_batchTimeSeriesUpdatesStateForLargeBatches(t *testing.T) {
tsMap1 := getTimeseriesMap(tsArray)

state := newBatchTimeSericesState()
requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state)
requests, err := batchTimeSeries(tsMap1, 1000000, nil, state)

assert.NoError(t, err)
assert.Len(t, requests, 18)
Expand Down Expand Up @@ -132,7 +132,7 @@ func Benchmark_batchTimeSeries(b *testing.B) {
state := newBatchTimeSericesState()
// Run batchTimeSeries 100 times with a 1mb max request size
for i := 0; i < b.N; i++ {
requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state)
requests, err := batchTimeSeries(tsMap1, 1000000, nil, state)
assert.NoError(b, err)
assert.Len(b, requests, 18)
}
Expand Down

0 comments on commit 9d0297b

Please sign in to comment.