From 5f5d327abb0b3b8200d79b4f061b0416d50e7372 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Mon, 7 Oct 2024 13:36:30 -0300 Subject: [PATCH] [receiver/prometheusremotewrite] Parse labels into resource attributes Signed-off-by: Arthur Silva Sens --- .chloggen/prwreceiver-parselabels.yaml | 27 ++++ receiver/prometheusremotewritereceiver/go.mod | 2 + receiver/prometheusremotewritereceiver/go.sum | 6 + .../prometheusremotewritereceiver/receiver.go | 36 ++++- .../receiver_test.go | 130 +++++++++++++++++- 5 files changed, 198 insertions(+), 3 deletions(-) create mode 100644 .chloggen/prwreceiver-parselabels.yaml diff --git a/.chloggen/prwreceiver-parselabels.yaml b/.chloggen/prwreceiver-parselabels.yaml new file mode 100644 index 000000000000..6928cff0d60e --- /dev/null +++ b/.chloggen/prwreceiver-parselabels.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Parse labels from Prometheus Remote Write requests into Resource Attributes + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35656] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Warning - The HTTP Server still doesn't pass metrics to the next consumer. The component is unusable for now. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api, user] diff --git a/receiver/prometheusremotewritereceiver/go.mod b/receiver/prometheusremotewritereceiver/go.mod index ea47cb53f314..99bbf240b877 100644 --- a/receiver/prometheusremotewritereceiver/go.mod +++ b/receiver/prometheusremotewritereceiver/go.mod @@ -5,6 +5,7 @@ go 1.22.0 require ( github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0 github.com/prometheus/prometheus v0.54.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.111.0 @@ -46,6 +47,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.19.1 // indirect diff --git a/receiver/prometheusremotewritereceiver/go.sum b/receiver/prometheusremotewritereceiver/go.sum index c4f537c6148a..be51ced48ace 100644 --- a/receiver/prometheusremotewritereceiver/go.sum +++ b/receiver/prometheusremotewritereceiver/go.sum @@ -316,6 +316,12 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.111.0 h1:0MJmp4O7KUQOUmQYJEGNgtf30Nhx/3nLMn0jnU4Klhw= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.111.0/go.mod h1:4PYgwpscyZUUdQVLsd7dh+LXtm1QbWCvU47P3G/7tLg= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0 h1:Ld/1EUAQ6z3CirSyf4A8waHzUAZbMPrDOno+7tb0vKM= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0/go.mod h1:wAOT1iGOOTPTw2ysr0DW2Wrfi0/TECVgiGByRQfFiV4= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0 h1:kUUO8VNv/d9Tpx0NvOsRnUsz/JvZ8SWRnK+vT0cNjuU= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0/go.mod h1:SstR8PglIFBVGCZHS69bwJGl6TaCQQ5aLSEoas/8SRA= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 6bbdd267b994..28e3c6814668 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -14,6 +14,7 @@ import ( "github.com/gogo/protobuf/proto" promConfig "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/labels" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" promRemote "github.com/prometheus/prometheus/storage/remote" "go.opentelemetry.io/collector/component" @@ -153,6 +154,37 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promCo // translateV2 translates a v2 remote-write request into OTLP metrics. // For now translateV2 is not implemented and returns an empty metrics. // nolint -func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promRemote.WriteResponseStats, int, error) { - return pmetric.NewMetrics(), promRemote.WriteResponseStats{}, 0, nil +func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promRemote.WriteResponseStats, int, error) { + var ( + badRequestErrors []error + otelMetrics = pmetric.NewMetrics() + b = labels.NewScratchBuilder(0) + stats = promRemote.WriteResponseStats{} + ) + + resourceMetrics := otelMetrics.ResourceMetrics().AppendEmpty() + + for _, ts := range req.Timeseries { + ls := ts.ToLabels(&b, req.Symbols) + + if !ls.Has(labels.MetricName) { + badRequestErrors = append(badRequestErrors, fmt.Errorf("missing metric name in labels")) + continue + } else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate { + badRequestErrors = append(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel)) + continue + } + + ls = ls.DropMetricName() + for _, label := range ls { + resourceMetrics.Resource().Attributes().PutStr(label.Name, label.Value) + } + + } + + var statusCode int + if len(badRequestErrors) > 0 { + statusCode = http.StatusBadRequest + } + return otelMetrics, stats, statusCode, errors.Join(badRequestErrors...) } diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index 068d4caaa56a..f7101fa3fd89 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -13,14 +13,63 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" promConfig "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/histogram" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/prometheus/prometheus/storage/remote" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) -func setupServer(t *testing.T) { +var ( + testHistogram = histogram.Histogram{ + Schema: 2, + ZeroThreshold: 1e-128, + ZeroCount: 0, + Count: 0, + Sum: 20, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, + PositiveBuckets: []int64{1}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{-1}, + } + + writeV2RequestFixture = &writev2.Request{ + Symbols: []string{"", "__name__", "test_metric1", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_GAUGE, // writeV2RequestSeries1Metadata.Type. + + HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help. + UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit. + }, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: 1}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(1, &testHistogram), writev2.FromFloatHistogram(2, testHistogram.ToFloat(nil))}, + }, + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. + Metadata: writev2.Metadata{ + Type: writev2.Metadata_METRIC_TYPE_COUNTER, // writeV2RequestSeries2Metadata.Type. + + HelpRef: 17, // Symbolized writeV2RequestSeries2Metadata.Help. + // No unit. + }, + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{13, 14}, Value: 2, Timestamp: 2}}, + Histograms: []writev2.Histogram{writev2.FromIntHistogram(3, &testHistogram), writev2.FromFloatHistogram(4, testHistogram.ToFloat(nil))}, + }, + }, + } +) + +func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver { t.Helper() factory := NewFactory() @@ -30,6 +79,13 @@ func setupServer(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, prwReceiver, "metrics receiver creation failed") + return prwReceiver.(*prometheusRemoteWriteReceiver) +} + +func setupServer(t *testing.T) { + t.Helper() + + prwReceiver := setupMetricsReceiver(t) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -98,3 +154,75 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) { }) } } + +func TestTranslateV2(t *testing.T) { + prwReceiver := setupMetricsReceiver(t) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + for _, tc := range []struct { + name string + request *writev2.Request + expectError string + expectedMetrics pmetric.Metrics + expectedStats remote.WriteResponseStats + expectedCode int + }{ + { + name: "missing metric name", + request: &writev2.Request{ + Symbols: []string{"", "foo", "bar"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + }, + }, + expectError: "missing metric name in labels", + expectedCode: http.StatusBadRequest, + }, + { + name: "duplicate label", + request: &writev2.Request{ + Symbols: []string{"", "__name__", "test"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 1, 2}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + }, + }, + expectError: `duplicate label "__name__" in labels`, + expectedCode: http.StatusBadRequest, + }, + { + name: "valid request", + request: writeV2RequestFixture, + expectedMetrics: func() pmetric.Metrics { + expected := pmetric.NewMetrics() + rmAttributes := expected.ResourceMetrics().AppendEmpty().Resource().Attributes() + rmAttributes.PutStr("b", "c") + rmAttributes.PutStr("baz", "qux") + rmAttributes.PutStr("d", "e") + rmAttributes.PutStr("foo", "bar") + return expected + }(), + expectedStats: remote.WriteResponseStats{}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + metrics, stats, code, err := prwReceiver.translateV2(ctx, tc.request) + if tc.expectError != "" { + assert.ErrorContains(t, err, tc.expectError) + assert.Equal(t, tc.expectedCode, code) + return + } + + assert.NoError(t, err) + assert.NoError(t, pmetrictest.CompareMetrics(tc.expectedMetrics, metrics)) + assert.Equal(t, tc.expectedStats, stats) + assert.Equal(t, tc.expectedCode, code) + }) + } +}