Skip to content

Commit

Permalink
[receiver/prometheusremotewrite] Parse labels (open-telemetry#35656)
Browse files Browse the repository at this point in the history
#### Description
This PR builds on top of
open-telemetry#35535,
open-telemetry#35565
and
open-telemetry#35624.

Here we're parsing labels into resource/metric attributes. It's still
not great because resource attributes (with exception to
`service.namespace`, `service.name` and `service.name.id`) are encoded
into a special metric called `target_info`. Metrics related to specific
target infos may arrive in separate write requests, so it may be
impossible to build the full OTLP metric in a stateless way.

In this PR I'm ignoring this problem 😛, and transforming `job` and
`instance` labels into resource attributes, while all other labels
become scope attributes.

Please focus on the latest commit when reviewing this PR :) 
1c9ff80

---------

Signed-off-by: Arthur Silva Sens <[email protected]>
  • Loading branch information
ArthurSens authored and AkhigbeEromo committed Jan 13, 2025
1 parent 7490e4e commit 728a583
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 5 deletions.
27 changes: 27 additions & 0 deletions .chloggen/prwreceiver-parselabels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/prometheusremotewrite

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Parse labels from Prometheus Remote Write requests into Resource and Metric Attributes.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35656]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Warning - The HTTP Server still doesn't pass metrics to the next consumer. The component is unusable for now.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
10 changes: 9 additions & 1 deletion receiver/prometheusremotewritereceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/promet
go 1.22.0

require (
github.com/cespare/xxhash/v2 v2.3.0
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.114.0
github.com/prometheus/prometheus v0.54.1
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.115.1-0.20241206185113-3f3e208e71b8
Expand All @@ -29,7 +31,6 @@ require (
github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 // indirect
github.com/aws/aws-sdk-go v1.54.19 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand All @@ -56,6 +57,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.115.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
Expand Down Expand Up @@ -105,3 +107,9 @@ require (
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
114 changes: 111 additions & 3 deletions receiver/prometheusremotewritereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import (
"strings"
"time"

"github.com/cespare/xxhash/v2"
"github.com/gogo/protobuf/proto"
promconfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
promremote "github.com/prometheus/prometheus/storage/remote"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -150,8 +153,113 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco
}

// translateV2 translates a v2 remote-write request into OTLP metrics.
// For now translateV2 is not implemented and returns an empty metrics.
// translate is not feature complete.
// nolint
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
return pmetric.NewMetrics(), promremote.WriteResponseStats{}, nil
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
var (
badRequestErrors error
otelMetrics = pmetric.NewMetrics()
labelsBuilder = labels.NewScratchBuilder(0)
stats = promremote.WriteResponseStats{}
// Prometheus Remote-Write can send multiple time series with the same labels in the same request.
// Instead of creating a whole new OTLP metric, we just append the new sample to the existing OTLP metric.
// This cache is called "intra" because in the future we'll have a "interRequestCache" to cache resourceAttributes
// between requests based on the metric "target_info".
intraRequestCache = make(map[uint64]pmetric.ResourceMetrics)
)

for _, ts := range req.Timeseries {
ls := ts.ToLabels(&labelsBuilder, req.Symbols)

if !ls.Has(labels.MetricName) {
badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("missing metric name in labels"))
continue
} else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate {
badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel))
continue
}

var rm pmetric.ResourceMetrics
hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))
intraCacheEntry, ok := intraRequestCache[hashedLabels]
if ok {
// We found the same time series in the same request, so we should append to the same OTLP metric.
rm = intraCacheEntry
} else {
rm = otelMetrics.ResourceMetrics().AppendEmpty()
parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance"))
intraRequestCache[hashedLabels] = rm
}

switch ts.Metadata.Type {
case writev2.Metadata_METRIC_TYPE_COUNTER:
addCounterDatapoints(rm, ls, ts)
case writev2.Metadata_METRIC_TYPE_GAUGE:
addGaugeDatapoints(rm, ls, ts)
case writev2.Metadata_METRIC_TYPE_SUMMARY:
addSummaryDatapoints(rm, ls, ts)
case writev2.Metadata_METRIC_TYPE_HISTOGRAM:
addHistogramDatapoints(rm, ls, ts)
default:
badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("unsupported metric type %q for metric %q", ts.Metadata.Type, ls.Get(labels.MetricName)))
}
}

return otelMetrics, stats, badRequestErrors
}

// parseJobAndInstance turns the job and instance labels service resource attributes.
// Following the specification at https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/
func parseJobAndInstance(dest pcommon.Map, job, instance string) {
if instance != "" {
dest.PutStr("service.instance.id", instance)
}
if job != "" {
parts := strings.Split(job, "/")
if len(parts) == 2 {
dest.PutStr("service.namespace", parts[0])
dest.PutStr("service.name", parts[1])
return
}
dest.PutStr("service.name", job)
}
}

func addCounterDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) {
// TODO: Implement this function
}

func addGaugeDatapoints(rm pmetric.ResourceMetrics, ls labels.Labels, ts writev2.TimeSeries) {
// TODO: Cache metric name+type+unit and look up cache before creating new empty metric.
// In OTel name+type+unit is the unique identifier of a metric and we should not create
// a new metric if it already exists.

// TODO: Check if Scope is already present by comparing labels "otel_scope_name" and "otel_scope_version"
// with Scope.Name and Scope.Version. If it is present, we should append to the existing Scope.
m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge()
addDatapoints(m.DataPoints(), ls, ts)
}

func addSummaryDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) {
// TODO: Implement this function
}

func addHistogramDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) {
// TODO: Implement this function
}

// addDatapoints adds the labels to the datapoints attributes.
// TODO: We're still not handling several fields that make a datapoint complete, e.g. StartTimestamp,
// Timestamp, Value, etc.
func addDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, _ writev2.TimeSeries) {
attributes := datapoints.AppendEmpty().Attributes()

for _, l := range ls {
if l.Name == "instance" || l.Name == "job" || // Become resource attributes "service.name", "service.instance.id" and "service.namespace"
l.Name == labels.MetricName || // Becomes metric name
l.Name == "otel_scope_name" || l.Name == "otel_scope_version" { // Becomes scope name and version
continue
}
attributes.PutStr(l.Name, l.Value)
}
}
120 changes: 119 additions & 1 deletion receiver/prometheusremotewritereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,38 @@ import (
"github.com/golang/snappy"
promconfig "github.com/prometheus/prometheus/config"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage/remote"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
)

func setupServer(t *testing.T) {
var writeV2RequestFixture = &writev2.Request{
Symbols: []string{"", "__name__", "test_metric1", "job", "service-x/test", "instance", "107cn001", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"},
Timeseries: []writev2.TimeSeries{
{
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
},
{
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. Should use the same resource metrics.
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
},
{
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
LabelsRefs: []uint32{1, 2, 3, 9, 5, 10, 7, 8, 9, 10}, // This series has different label values for job and instance.
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
},
},
}

func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver {
t.Helper()

factory := NewFactory()
Expand All @@ -30,6 +55,13 @@ func setupServer(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, prwReceiver, "metrics receiver creation failed")

return prwReceiver.(*prometheusRemoteWriteReceiver)
}

func setupServer(t *testing.T) {
t.Helper()

prwReceiver := setupMetricsReceiver(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

Expand Down Expand Up @@ -98,3 +130,89 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) {
})
}
}

func TestTranslateV2(t *testing.T) {
prwReceiver := setupMetricsReceiver(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

for _, tc := range []struct {
name string
request *writev2.Request
expectError string
expectedMetrics pmetric.Metrics
expectedStats remote.WriteResponseStats
}{
{
name: "missing metric name",
request: &writev2.Request{
Symbols: []string{"", "foo", "bar"},
Timeseries: []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
},
},
},
expectError: "missing metric name in labels",
},
{
name: "duplicate label",
request: &writev2.Request{
Symbols: []string{"", "__name__", "test"},
Timeseries: []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2, 1, 2},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
},
},
},
expectError: `duplicate label "__name__" in labels`,
},
{
name: "valid request",
request: writeV2RequestFixture,
expectedMetrics: func() pmetric.Metrics {
expected := pmetric.NewMetrics()
rm1 := expected.ResourceMetrics().AppendEmpty()
rmAttributes1 := rm1.Resource().Attributes()
rmAttributes1.PutStr("service.namespace", "service-x")
rmAttributes1.PutStr("service.name", "test")
rmAttributes1.PutStr("service.instance.id", "107cn001")
sm1 := rm1.ScopeMetrics().AppendEmpty()
sm1Attributes := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes()
sm1Attributes.PutStr("d", "e")
sm1Attributes.PutStr("foo", "bar")
// Since we don't check "scope_name" and "scope_version", we end up with duplicated scope metrics for repeated series.
// TODO: Properly handle scope metrics.
sm2 := rm1.ScopeMetrics().AppendEmpty()
sm2Attributes := sm2.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes()
sm2Attributes.PutStr("d", "e")
sm2Attributes.PutStr("foo", "bar")

rm2 := expected.ResourceMetrics().AppendEmpty()
rmAttributes2 := rm2.Resource().Attributes()
rmAttributes2.PutStr("service.name", "foo")
rmAttributes2.PutStr("service.instance.id", "bar")
mAttributes2 := rm2.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes()
mAttributes2.PutStr("d", "e")
mAttributes2.PutStr("foo", "bar")

return expected
}(),
expectedStats: remote.WriteResponseStats{},
},
} {
t.Run(tc.name, func(t *testing.T) {
metrics, stats, err := prwReceiver.translateV2(ctx, tc.request)
if tc.expectError != "" {
assert.ErrorContains(t, err, tc.expectError)
return
}

assert.NoError(t, err)
assert.NoError(t, pmetrictest.CompareMetrics(tc.expectedMetrics, metrics))
assert.Equal(t, tc.expectedStats, stats)
})
}
}

0 comments on commit 728a583

Please sign in to comment.