Skip to content

Commit

Permalink
[prometheusremotewriteexporter] reduce allocations when serializing p…
Browse files Browse the repository at this point in the history
…rotobufs (#35185)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

There are two allocations that happen on every push
1. Serializing the remote write protobuf to a byte array.
2. Compressing the byte array with snappy.

Since these buffers can be quite large, we can save some allocations
with a `sync.Pool`.

**Link to tracking Issue:** <Issue number if applicable>

**Testing:**

Tests still pass. We have been running this successfully in production
for a few months now.

```
                           │ /tmp/old.txt │            /tmp/new.txt            │
                           │    sec/op    │   sec/op     vs base               │
Execute/numSample=100-14      43.11µ ± 2%   43.09µ ± 2%       ~ (p=0.853 n=10)
Execute/numSample=1000-14     105.4µ ± 1%   102.2µ ± 1%  -3.04% (p=0.000 n=10)
Execute/numSample=10000-14    685.5µ ± 1%   663.6µ ± 5%  -3.19% (p=0.023 n=10)
geomean                       146.0µ        143.0µ       -2.10%

                           │ /tmp/old.txt  │             /tmp/new.txt             │
                           │     B/op      │     B/op      vs base                │
Execute/numSample=100-14     14.809Ki ± 0%   6.091Ki ± 0%  -58.87% (p=0.000 n=10)
Execute/numSample=1000-14     94.18Ki ± 0%   22.16Ki ± 0%  -76.47% (p=0.000 n=10)
Execute/numSample=10000-14   726.23Ki ± 0%   39.83Ki ± 0%  -94.52% (p=0.000 n=10)
geomean                       100.4Ki        17.52Ki       -82.56%

                           │ /tmp/old.txt │           /tmp/new.txt            │
                           │  allocs/op   │ allocs/op   vs base               │
Execute/numSample=100-14       81.00 ± 0%   79.00 ± 0%  -2.47% (p=0.000 n=10)
Execute/numSample=1000-14      85.00 ± 0%   83.00 ± 0%  -2.35% (p=0.000 n=10)
Execute/numSample=10000-14     85.00 ± 0%   83.00 ± 0%  -2.35% (p=0.000 n=10)
geomean                        83.65        81.64       -2.39%
```

**Documentation:** <Describe the documentation added.>

---------

Co-authored-by: David Ashpole <[email protected]>
  • Loading branch information
edma2 and dashpole authored Nov 13, 2024
1 parent d3f8160 commit 7281556
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: reduce allocation when serializing protobuf

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35185]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
33 changes: 30 additions & 3 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS
p.telemetryBuilder.ExporterPrometheusremotewriteTranslatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...))
}

type buffer struct {
protobuf *proto.Buffer
snappy []byte
}

// A reusable buffer pool for serializing protobufs and compressing them with Snappy.
var bufferPool = sync.Pool{
New: func() any {
return &buffer{
protobuf: proto.NewBuffer(nil),
snappy: nil,
}
},
}

// prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint.
type prwExporter struct {
endpointURL *url.URL
Expand Down Expand Up @@ -271,14 +286,26 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq
}

func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error {
buf := bufferPool.Get().(*buffer)
buf.protobuf.Reset()
defer bufferPool.Put(buf)

// Uses proto.Marshal to convert the WriteRequest into bytes array
data, errMarshal := proto.Marshal(writeReq)
errMarshal := buf.protobuf.Marshal(writeReq)
if errMarshal != nil {
return consumererror.NewPermanent(errMarshal)
}
// If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer.
// Therefore we always let Snappy decide the size of the buffer.
compressedData := snappy.Encode(nil, data)
// Manually grow the buffer to make sure Snappy uses it and we can re-use it afterwards.
maxCompressedLen := snappy.MaxEncodedLen(len(buf.protobuf.Bytes()))
if maxCompressedLen > len(buf.snappy) {
if cap(buf.snappy) < maxCompressedLen {
buf.snappy = make([]byte, maxCompressedLen)
} else {
buf.snappy = buf.snappy[:maxCompressedLen]
}
}
compressedData := snappy.Encode(buf.snappy, buf.protobuf.Bytes())

// executeFunc can be used for backoff and non backoff scenarios.
executeFunc := func() error {
Expand Down
94 changes: 94 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ package prometheusremotewriteexporter

import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1175,3 +1178,94 @@ func TestRetries(t *testing.T) {
})
}
}

func BenchmarkExecute(b *testing.B) {
for _, numSample := range []int{100, 1000, 10000} {
b.Run(fmt.Sprintf("numSample=%d", numSample), func(b *testing.B) {
benchmarkExecute(b, numSample)
})
}
}

func benchmarkExecute(b *testing.B, numSample int) {
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer mockServer.Close()
endpointURL, err := url.Parse(mockServer.URL)
require.NoError(b, err)

// Create the prwExporter
exporter := &prwExporter{
endpointURL: endpointURL,
client: http.DefaultClient,
}

generateSamples := func(n int) []prompb.Sample {
samples := make([]prompb.Sample, 0, n)
for i := 0; i < n; i++ {
samples = append(samples, prompb.Sample{
Timestamp: int64(i),
Value: float64(i),
})
}
return samples
}

generateHistograms := func(n int) []prompb.Histogram {
histograms := make([]prompb.Histogram, 0, n)
for i := 0; i < n; i++ {
histograms = append(histograms, prompb.Histogram{
Timestamp: int64(i),
Count: &prompb.Histogram_CountInt{CountInt: uint64(i)},
PositiveCounts: []float64{float64(i)},
})
}
return histograms
}

reqs := make([]*prompb.WriteRequest, 0, b.N)
const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte"
for n := 0; n < b.N; n++ {
num := strings.Repeat(strconv.Itoa(n), 16)
req := &prompb.WriteRequest{
Metadata: []prompb.MetricMetadata{
{
Type: prompb.MetricMetadata_COUNTER,
Unit: "seconds",
Help: "This is a counter",
},
{
Type: prompb.MetricMetadata_HISTOGRAM,
Unit: "seconds",
Help: "This is a histogram",
},
},
Timeseries: []prompb.TimeSeries{
{
Samples: generateSamples(numSample),
Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric"},
{Name: "test_label_name_" + num, Value: labelValue + num},
},
},
{
Histograms: generateHistograms(numSample),
Labels: []prompb.Label{
{Name: "__name__", Value: "test_histogram"},
{Name: "test_label_name_" + num, Value: labelValue + num},
},
},
},
}
reqs = append(reqs, req)
}

ctx := context.Background()
b.ReportAllocs()
b.ResetTimer()
for _, req := range reqs {
err := exporter.execute(ctx, req)
require.NoError(b, err)
}
}

0 comments on commit 7281556

Please sign in to comment.