From 7281556ee5d5b09e6835b1f48777253d0ef08c27 Mon Sep 17 00:00:00 2001 From: Eugene Ma Date: Wed, 13 Nov 2024 15:38:57 -0800 Subject: [PATCH] [prometheusremotewriteexporter] reduce allocations when serializing protobufs (#35185) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Description:** There are two allocations that happen on every push 1. Serializing the remote write protobuf to a byte array. 2. Compressing the byte array with snappy. Since these buffers can be quite large, we can save some allocations with a `sync.Pool`. **Link to tracking Issue:** **Testing:** Tests still pass. We have been running this successfully in production for a few months now. ``` │ /tmp/old.txt │ /tmp/new.txt │ │ sec/op │ sec/op vs base │ Execute/numSample=100-14 43.11µ ± 2% 43.09µ ± 2% ~ (p=0.853 n=10) Execute/numSample=1000-14 105.4µ ± 1% 102.2µ ± 1% -3.04% (p=0.000 n=10) Execute/numSample=10000-14 685.5µ ± 1% 663.6µ ± 5% -3.19% (p=0.023 n=10) geomean 146.0µ 143.0µ -2.10% │ /tmp/old.txt │ /tmp/new.txt │ │ B/op │ B/op vs base │ Execute/numSample=100-14 14.809Ki ± 0% 6.091Ki ± 0% -58.87% (p=0.000 n=10) Execute/numSample=1000-14 94.18Ki ± 0% 22.16Ki ± 0% -76.47% (p=0.000 n=10) Execute/numSample=10000-14 726.23Ki ± 0% 39.83Ki ± 0% -94.52% (p=0.000 n=10) geomean 100.4Ki 17.52Ki -82.56% │ /tmp/old.txt │ /tmp/new.txt │ │ allocs/op │ allocs/op vs base │ Execute/numSample=100-14 81.00 ± 0% 79.00 ± 0% -2.47% (p=0.000 n=10) Execute/numSample=1000-14 85.00 ± 0% 83.00 ± 0% -2.35% (p=0.000 n=10) Execute/numSample=10000-14 85.00 ± 0% 83.00 ± 0% -2.35% (p=0.000 n=10) geomean 83.65 81.64 -2.39% ``` **Documentation:** --------- Co-authored-by: David Ashpole --- ...ewriteexporter-optimize-serialization.yaml | 27 ++++++ .../prometheusremotewriteexporter/exporter.go | 33 ++++++- .../exporter_test.go | 94 +++++++++++++++++++ 3 files changed, 151 insertions(+), 3 deletions(-) create mode 100644 .chloggen/prometheusremotewriteexporter-optimize-serialization.yaml diff --git a/.chloggen/prometheusremotewriteexporter-optimize-serialization.yaml b/.chloggen/prometheusremotewriteexporter-optimize-serialization.yaml new file mode 100644 index 000000000000..7535f6626170 --- /dev/null +++ b/.chloggen/prometheusremotewriteexporter-optimize-serialization.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewriteexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: reduce allocation when serializing protobuf + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35185] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index aa4a58082416..f8bbf7755be8 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -53,6 +53,21 @@ func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS p.telemetryBuilder.ExporterPrometheusremotewriteTranslatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...)) } +type buffer struct { + protobuf *proto.Buffer + snappy []byte +} + +// A reusable buffer pool for serializing protobufs and compressing them with Snappy. +var bufferPool = sync.Pool{ + New: func() any { + return &buffer{ + protobuf: proto.NewBuffer(nil), + snappy: nil, + } + }, +} + // prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint. type prwExporter struct { endpointURL *url.URL @@ -271,14 +286,26 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq } func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error { + buf := bufferPool.Get().(*buffer) + buf.protobuf.Reset() + defer bufferPool.Put(buf) + // Uses proto.Marshal to convert the WriteRequest into bytes array - data, errMarshal := proto.Marshal(writeReq) + errMarshal := buf.protobuf.Marshal(writeReq) if errMarshal != nil { return consumererror.NewPermanent(errMarshal) } // If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer. - // Therefore we always let Snappy decide the size of the buffer. - compressedData := snappy.Encode(nil, data) + // Manually grow the buffer to make sure Snappy uses it and we can re-use it afterwards. + maxCompressedLen := snappy.MaxEncodedLen(len(buf.protobuf.Bytes())) + if maxCompressedLen > len(buf.snappy) { + if cap(buf.snappy) < maxCompressedLen { + buf.snappy = make([]byte, maxCompressedLen) + } else { + buf.snappy = buf.snappy[:maxCompressedLen] + } + } + compressedData := snappy.Encode(buf.snappy, buf.protobuf.Bytes()) // executeFunc can be used for backoff and non backoff scenarios. executeFunc := func() error { diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index f450f6e02461..1a15dd4a7b47 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -5,10 +5,13 @@ package prometheusremotewriteexporter import ( "context" + "fmt" "io" "net/http" "net/http/httptest" "net/url" + "strconv" + "strings" "sync" "testing" "time" @@ -1175,3 +1178,94 @@ func TestRetries(t *testing.T) { }) } } + +func BenchmarkExecute(b *testing.B) { + for _, numSample := range []int{100, 1000, 10000} { + b.Run(fmt.Sprintf("numSample=%d", numSample), func(b *testing.B) { + benchmarkExecute(b, numSample) + }) + } +} + +func benchmarkExecute(b *testing.B, numSample int) { + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer mockServer.Close() + endpointURL, err := url.Parse(mockServer.URL) + require.NoError(b, err) + + // Create the prwExporter + exporter := &prwExporter{ + endpointURL: endpointURL, + client: http.DefaultClient, + } + + generateSamples := func(n int) []prompb.Sample { + samples := make([]prompb.Sample, 0, n) + for i := 0; i < n; i++ { + samples = append(samples, prompb.Sample{ + Timestamp: int64(i), + Value: float64(i), + }) + } + return samples + } + + generateHistograms := func(n int) []prompb.Histogram { + histograms := make([]prompb.Histogram, 0, n) + for i := 0; i < n; i++ { + histograms = append(histograms, prompb.Histogram{ + Timestamp: int64(i), + Count: &prompb.Histogram_CountInt{CountInt: uint64(i)}, + PositiveCounts: []float64{float64(i)}, + }) + } + return histograms + } + + reqs := make([]*prompb.WriteRequest, 0, b.N) + const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte" + for n := 0; n < b.N; n++ { + num := strings.Repeat(strconv.Itoa(n), 16) + req := &prompb.WriteRequest{ + Metadata: []prompb.MetricMetadata{ + { + Type: prompb.MetricMetadata_COUNTER, + Unit: "seconds", + Help: "This is a counter", + }, + { + Type: prompb.MetricMetadata_HISTOGRAM, + Unit: "seconds", + Help: "This is a histogram", + }, + }, + Timeseries: []prompb.TimeSeries{ + { + Samples: generateSamples(numSample), + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric"}, + {Name: "test_label_name_" + num, Value: labelValue + num}, + }, + }, + { + Histograms: generateHistograms(numSample), + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_histogram"}, + {Name: "test_label_name_" + num, Value: labelValue + num}, + }, + }, + }, + } + reqs = append(reqs, req) + } + + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + for _, req := range reqs { + err := exporter.execute(ctx, req) + require.NoError(b, err) + } +}