diff --git a/.chloggen/prometheusremotewriteexporter-optimize-serialization.yaml b/.chloggen/prometheusremotewriteexporter-optimize-serialization.yaml new file mode 100644 index 000000000000..7535f6626170 --- /dev/null +++ b/.chloggen/prometheusremotewriteexporter-optimize-serialization.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewriteexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: reduce allocation when serializing protobuf + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35185] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index aa4a58082416..f8bbf7755be8 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -53,6 +53,21 @@ func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS p.telemetryBuilder.ExporterPrometheusremotewriteTranslatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...)) } +type buffer struct { + protobuf *proto.Buffer + snappy []byte +} + +// A reusable buffer pool for serializing protobufs and compressing them with Snappy. +var bufferPool = sync.Pool{ + New: func() any { + return &buffer{ + protobuf: proto.NewBuffer(nil), + snappy: nil, + } + }, +} + // prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint. type prwExporter struct { endpointURL *url.URL @@ -271,14 +286,26 @@ func (prwe *prwExporter) export(ctx context.Context, requests []*prompb.WriteReq } func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequest) error { + buf := bufferPool.Get().(*buffer) + buf.protobuf.Reset() + defer bufferPool.Put(buf) + // Uses proto.Marshal to convert the WriteRequest into bytes array - data, errMarshal := proto.Marshal(writeReq) + errMarshal := buf.protobuf.Marshal(writeReq) if errMarshal != nil { return consumererror.NewPermanent(errMarshal) } // If we don't pass a buffer large enough, Snappy Encode function will not use it and instead will allocate a new buffer. - // Therefore we always let Snappy decide the size of the buffer. - compressedData := snappy.Encode(nil, data) + // Manually grow the buffer to make sure Snappy uses it and we can re-use it afterwards. + maxCompressedLen := snappy.MaxEncodedLen(len(buf.protobuf.Bytes())) + if maxCompressedLen > len(buf.snappy) { + if cap(buf.snappy) < maxCompressedLen { + buf.snappy = make([]byte, maxCompressedLen) + } else { + buf.snappy = buf.snappy[:maxCompressedLen] + } + } + compressedData := snappy.Encode(buf.snappy, buf.protobuf.Bytes()) // executeFunc can be used for backoff and non backoff scenarios. executeFunc := func() error { diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index f450f6e02461..1a15dd4a7b47 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -5,10 +5,13 @@ package prometheusremotewriteexporter import ( "context" + "fmt" "io" "net/http" "net/http/httptest" "net/url" + "strconv" + "strings" "sync" "testing" "time" @@ -1175,3 +1178,94 @@ func TestRetries(t *testing.T) { }) } } + +func BenchmarkExecute(b *testing.B) { + for _, numSample := range []int{100, 1000, 10000} { + b.Run(fmt.Sprintf("numSample=%d", numSample), func(b *testing.B) { + benchmarkExecute(b, numSample) + }) + } +} + +func benchmarkExecute(b *testing.B, numSample int) { + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer mockServer.Close() + endpointURL, err := url.Parse(mockServer.URL) + require.NoError(b, err) + + // Create the prwExporter + exporter := &prwExporter{ + endpointURL: endpointURL, + client: http.DefaultClient, + } + + generateSamples := func(n int) []prompb.Sample { + samples := make([]prompb.Sample, 0, n) + for i := 0; i < n; i++ { + samples = append(samples, prompb.Sample{ + Timestamp: int64(i), + Value: float64(i), + }) + } + return samples + } + + generateHistograms := func(n int) []prompb.Histogram { + histograms := make([]prompb.Histogram, 0, n) + for i := 0; i < n; i++ { + histograms = append(histograms, prompb.Histogram{ + Timestamp: int64(i), + Count: &prompb.Histogram_CountInt{CountInt: uint64(i)}, + PositiveCounts: []float64{float64(i)}, + }) + } + return histograms + } + + reqs := make([]*prompb.WriteRequest, 0, b.N) + const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte" + for n := 0; n < b.N; n++ { + num := strings.Repeat(strconv.Itoa(n), 16) + req := &prompb.WriteRequest{ + Metadata: []prompb.MetricMetadata{ + { + Type: prompb.MetricMetadata_COUNTER, + Unit: "seconds", + Help: "This is a counter", + }, + { + Type: prompb.MetricMetadata_HISTOGRAM, + Unit: "seconds", + Help: "This is a histogram", + }, + }, + Timeseries: []prompb.TimeSeries{ + { + Samples: generateSamples(numSample), + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_metric"}, + {Name: "test_label_name_" + num, Value: labelValue + num}, + }, + }, + { + Histograms: generateHistograms(numSample), + Labels: []prompb.Label{ + {Name: "__name__", Value: "test_histogram"}, + {Name: "test_label_name_" + num, Value: labelValue + num}, + }, + }, + }, + } + reqs = append(reqs, req) + } + + ctx := context.Background() + b.ReportAllocs() + b.ResetTimer() + for _, req := range reqs { + err := exporter.execute(ctx, req) + require.NoError(b, err) + } +}