From 9d7f6f2c7ac3b0711342abfbdfa08ed33b5da130 Mon Sep 17 00:00:00 2001 From: odubajDT Date: Mon, 30 Sep 2024 09:11:21 +0200 Subject: [PATCH 1/4] [receiver/cloudfoundry]: move datapoint level attributes to resource level Signed-off-by: odubajDT --- .chloggen/move-attributes.yaml | 27 + receiver/cloudfoundryreceiver/README.md | 33 +- receiver/cloudfoundryreceiver/converter.go | 74 +- .../cloudfoundryreceiver/converter_test.go | 248 +++++-- receiver/cloudfoundryreceiver/go.mod | 5 +- receiver/cloudfoundryreceiver/go.sum | 36 +- receiver/cloudfoundryreceiver/receiver.go | 78 ++- .../cloudfoundryreceiver/receiver_test.go | 655 ++++++++++++++++++ 8 files changed, 1061 insertions(+), 95 deletions(-) create mode 100644 .chloggen/move-attributes.yaml diff --git a/.chloggen/move-attributes.yaml b/.chloggen/move-attributes.yaml new file mode 100644 index 000000000000..b63b3992b981 --- /dev/null +++ b/.chloggen/move-attributes.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cloudfoundryreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Introduce a feature gate enable copying envelope tags to the metrics as resource attributes instead of datapoint attributes. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34824] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/cloudfoundryreceiver/README.md b/receiver/cloudfoundryreceiver/README.md index 322c01948d8e..1e1812a03df9 100644 --- a/receiver/cloudfoundryreceiver/README.md +++ b/receiver/cloudfoundryreceiver/README.md @@ -132,4 +132,35 @@ The receiver maps the envelope attribute tags to the following OpenTelemetry att * `instance_id` - numerical index of the origin. If origin is `rep` (`source_type` is `APP`) this is the application index. However, for other cases this is the instance index. * `process_id` - process ID (GUID) * `process_instance_id` - unique ID of a process instance, should be treated as an opaque string -* `process_type` - process type. Each application has exactly one process of type `web`, but many have any number of other processes \ No newline at end of file +* `process_type` - process type. Each application has exactly one process of type `web`, but many have any number of other processes + +## Feature Gate + +### `cloudfoundry.resourceAttributes.allow` + +#### Purpose + +The `cloudfoundry.resourceAttributes.allow` [feature gate](https://github.com/open-telemetry/opentelemetry-collector/blob/main/featuregate/README.md#collector-feature-gates) allows the envelope tags being copied to the metrics as resource attributes instead of datapoint attributes (default `false`). +Therefore all `org.cloudfoundry.*` datapoint attributes won't be present anymore on metrics datapoint level, but on resource level instead, since the attributes describe the resource and not the datapoints itself. + +The `cloudfoundry.resourceAttributes.allow` feature gate is available since version `v0.109.0` and will be held at least for 2 versions (`v0.111.0`) until promoting to `beta` and another 2 vesions (`v0.113.0`) until promoting to `stable`. + +Below you can see the list of attributes that are present the resource level instead of datapoint level (when `cloudfoundry.resourceAttributes.allow` feature gate is enabled): + +``` + - org.cloudfoundry.index + - org.cloudfoundry.ip + - org.cloudfoundry.deployment + - org.cloudfoundry.id + - org.cloudfoundry.job + - org.cloudfoundry.product + - org.cloudfoundry.instance_group + - org.cloudfoundry.instance_id + - org.cloudfoundry.origin + - org.cloudfoundry.system_domain + - org.cloudfoundry.source_id + - org.cloudfoundry.source_type + - org.cloudfoundry.process_type + - org.cloudfoundry.process_id + - org.cloudfoundry.process_instance_id +``` diff --git a/receiver/cloudfoundryreceiver/converter.go b/receiver/cloudfoundryreceiver/converter.go index b03cac337df9..9b37c12a133a 100644 --- a/receiver/cloudfoundryreceiver/converter.go +++ b/receiver/cloudfoundryreceiver/converter.go @@ -5,9 +5,11 @@ package cloudfoundryreceiver // import "github.com/open-telemetry/opentelemetry- import ( "fmt" + "slices" "time" "code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -17,6 +19,31 @@ const ( attributeNamePrefix = "org.cloudfoundry." ) +var ResourceAttributesKeys = []string{ + "index", + "ip", + "deployment", + "id", + "job", + "product", + "instance_group", + "instance_id", + "origin", + "system_domain", + "source_id", + "source_type", + "process_type", + "process_id", + "process_instance_id", +} + +var allowResourceAttributes = featuregate.GlobalRegistry().MustRegister( + "cloudfoundry.resourceAttributes.allow", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, envelope tags are copied to the metrics as resource attributes instead of datapoint attributes"), + featuregate.WithRegisterFromVersion("v0.109.0"), +) + func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pmetric.MetricSlice, startTime time.Time) { namePrefix := envelope.Tags["origin"] + "." @@ -28,7 +55,12 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme dataPoint.SetDoubleValue(float64(message.Counter.GetTotal())) dataPoint.SetTimestamp(pcommon.Timestamp(envelope.GetTimestamp())) dataPoint.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) - copyEnvelopeAttributes(dataPoint.Attributes(), envelope) + if allowResourceAttributes.IsEnabled() { + attrs := getEnvelopeDatapointAttributes(envelope) + attrs.CopyTo(dataPoint.Attributes()) + } else { + copyEnvelopeAttributes(dataPoint.Attributes(), envelope) + } case *loggregator_v2.Envelope_Gauge: for name, value := range message.Gauge.GetMetrics() { metric := metricSlice.AppendEmpty() @@ -37,7 +69,12 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme dataPoint.SetDoubleValue(value.Value) dataPoint.SetTimestamp(pcommon.Timestamp(envelope.GetTimestamp())) dataPoint.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) - copyEnvelopeAttributes(dataPoint.Attributes(), envelope) + if allowResourceAttributes.IsEnabled() { + attrs := getEnvelopeDatapointAttributes(envelope) + attrs.CopyTo(dataPoint.Attributes()) + } else { + copyEnvelopeAttributes(dataPoint.Attributes(), envelope) + } } } } @@ -59,7 +96,12 @@ func convertEnvelopeToLogs(envelope *loggregator_v2.Envelope, logSlice plog.LogR default: return fmt.Errorf("unsupported envelope log type: %s", envelope.GetLog().GetType()) } - copyEnvelopeAttributes(log.Attributes(), envelope) + if allowResourceAttributes.IsEnabled() { + attrs := getEnvelopeDatapointAttributes(envelope) + attrs.CopyTo(log.Attributes()) + } else { + copyEnvelopeAttributes(log.Attributes(), envelope) + } return nil } @@ -74,3 +116,29 @@ func copyEnvelopeAttributes(attributes pcommon.Map, envelope *loggregator_v2.Env attributes.PutStr(attributeNamePrefix+"instance_id", envelope.InstanceId) } } + +func getEnvelopeDatapointAttributes(envelope *loggregator_v2.Envelope) pcommon.Map { + attrs := pcommon.NewMap() + for key, value := range envelope.Tags { + if !slices.Contains(ResourceAttributesKeys, key) { + attrs.PutStr(attributeNamePrefix+key, value) + } + } + return attrs +} + +func getEnvelopeResourceAttributes(envelope *loggregator_v2.Envelope) pcommon.Map { + attrs := pcommon.NewMap() + for key, value := range envelope.Tags { + if slices.Contains(ResourceAttributesKeys, key) { + attrs.PutStr(attributeNamePrefix+key, value) + } + } + if envelope.SourceId != "" { + attrs.PutStr(attributeNamePrefix+"source_id", envelope.SourceId) + } + if envelope.InstanceId != "" { + attrs.PutStr(attributeNamePrefix+"instance_id", envelope.InstanceId) + } + return attrs +} diff --git a/receiver/cloudfoundryreceiver/converter_test.go b/receiver/cloudfoundryreceiver/converter_test.go index 0953e8f400f8..0346c813bf1b 100644 --- a/receiver/cloudfoundryreceiver/converter_test.go +++ b/receiver/cloudfoundryreceiver/converter_test.go @@ -10,6 +10,7 @@ import ( "code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -28,6 +29,7 @@ func TestConvertCountEnvelope(t *testing.T) { "job": "router", "index": "bc276108-8282-48a5-bae7-c009c4392246", "ip": "10.244.0.34", + "custom": "datapoint", }, Message: &loggregator_v2.Envelope_Counter{ Counter: &loggregator_v2.Counter{ @@ -38,30 +40,64 @@ func TestConvertCountEnvelope(t *testing.T) { }, } - metricSlice := pmetric.NewMetricSlice() - - convertEnvelopeToMetrics(&envelope, metricSlice, before) - - require.Equal(t, 1, metricSlice.Len()) - - metric := metricSlice.At(0) - assert.Equal(t, "gorouter.bad_gateways", metric.Name()) - assert.Equal(t, pmetric.MetricTypeSum, metric.Type()) - dataPoints := metric.Sum().DataPoints() - assert.Equal(t, 1, dataPoints.Len()) - dataPoint := dataPoints.At(0) - assert.Equal(t, pcommon.NewTimestampFromTime(now), dataPoint.Timestamp()) - assert.Equal(t, pcommon.NewTimestampFromTime(before), dataPoint.StartTimestamp()) - assert.Equal(t, 10.0, dataPoint.DoubleValue()) - - assertAttributes(t, map[string]string{ + expectedAttributes := map[string]string{ "org.cloudfoundry.source_id": "uaa", "org.cloudfoundry.origin": "gorouter", "org.cloudfoundry.deployment": "cf", "org.cloudfoundry.job": "router", "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", "org.cloudfoundry.ip": "10.244.0.34", - }, dataPoint.Attributes()) + "org.cloudfoundry.custom": "datapoint", + } + + tests := []struct { + name string + envelope loggregator_v2.Envelope + expected map[string]string + resourceAttrs bool + }{ + { + name: "resource attributes true", + envelope: envelope, + expected: map[string]string{ + "org.cloudfoundry.custom": "datapoint", + }, + resourceAttrs: true, + }, + { + name: "resource attributes false", + envelope: envelope, + expected: expectedAttributes, + resourceAttrs: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resourceMetricSlice := pmetric.NewResourceMetricsSlice() + + if tt.resourceAttrs { + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), false)) + }) + } + + convertEnvelopeToMetrics(&tt.envelope, resourceMetricSlice.AppendEmpty().ScopeMetrics().AppendEmpty().Metrics(), before) + + require.Equal(t, 1, resourceMetricSlice.Len()) + + metric := resourceMetricSlice.At(0).ScopeMetrics().At(0).Metrics().At(0) + assert.Equal(t, "gorouter.bad_gateways", metric.Name()) + assert.Equal(t, pmetric.MetricTypeSum, metric.Type()) + dataPoints := metric.Sum().DataPoints() + assert.Equal(t, 1, dataPoints.Len()) + dataPoint := dataPoints.At(0) + assert.Equal(t, pcommon.NewTimestampFromTime(now), dataPoint.Timestamp()) + assert.Equal(t, pcommon.NewTimestampFromTime(before), dataPoint.StartTimestamp()) + assert.Equal(t, 10.0, dataPoint.DoubleValue()) + assertAttributes(t, tt.expected, dataPoint.Attributes()) + }) + } } func TestConvertGaugeEnvelope(t *testing.T) { @@ -81,6 +117,7 @@ func TestConvertGaugeEnvelope(t *testing.T) { "job": "compute", "index": "7505d2c9-beab-4aaa-afe3-41322ebcd13d", "ip": "10.0.4.8", + "custom": "datapoint", }, Message: &loggregator_v2.Envelope_Gauge{ Gauge: &loggregator_v2.Gauge{ @@ -109,38 +146,74 @@ func TestConvertGaugeEnvelope(t *testing.T) { "org.cloudfoundry.job": "compute", "org.cloudfoundry.index": "7505d2c9-beab-4aaa-afe3-41322ebcd13d", "org.cloudfoundry.ip": "10.0.4.8", + "org.cloudfoundry.custom": "datapoint", + } + + tests := []struct { + name string + envelope loggregator_v2.Envelope + expected map[string]string + resourceAttrs bool + }{ + { + name: "resource attributes true", + envelope: envelope, + expected: map[string]string{ + "org.cloudfoundry.custom": "datapoint", + }, + resourceAttrs: true, + }, + { + name: "resource attributes false", + envelope: envelope, + expected: expectedAttributes, + resourceAttrs: false, + }, } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resourceMetricSlice := pmetric.NewResourceMetricsSlice() - metricSlice := pmetric.NewMetricSlice() + if tt.resourceAttrs { + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), false)) + }) + } - convertEnvelopeToMetrics(&envelope, metricSlice, before) + convertEnvelopeToMetrics(&tt.envelope, resourceMetricSlice.AppendEmpty().ScopeMetrics().AppendEmpty().Metrics(), before) - require.Equal(t, 2, metricSlice.Len()) - memoryMetricPosition := 0 + require.Equal(t, 1, resourceMetricSlice.Len()) - if metricSlice.At(1).Name() == "rep.memory" { - memoryMetricPosition = 1 - } + metricSlice := resourceMetricSlice.At(0).ScopeMetrics().At(0).Metrics() + require.Equal(t, 2, metricSlice.Len()) + memoryMetricPosition := 0 + + if metricSlice.At(1).Name() == "rep.memory" { + memoryMetricPosition = 1 + } + + metric := metricSlice.At(memoryMetricPosition) + assert.Equal(t, "rep.memory", metric.Name()) + assert.Equal(t, pmetric.MetricTypeGauge, metric.Type()) + assert.Equal(t, 1, metric.Gauge().DataPoints().Len()) + dataPoint := metric.Gauge().DataPoints().At(0) + assert.Equal(t, pcommon.NewTimestampFromTime(now), dataPoint.Timestamp()) + assert.Equal(t, pcommon.NewTimestampFromTime(before), dataPoint.StartTimestamp()) + assert.Equal(t, 17046641.0, dataPoint.DoubleValue()) + assertAttributes(t, tt.expected, dataPoint.Attributes()) - metric := metricSlice.At(memoryMetricPosition) - assert.Equal(t, "rep.memory", metric.Name()) - assert.Equal(t, pmetric.MetricTypeGauge, metric.Type()) - assert.Equal(t, 1, metric.Gauge().DataPoints().Len()) - dataPoint := metric.Gauge().DataPoints().At(0) - assert.Equal(t, pcommon.NewTimestampFromTime(now), dataPoint.Timestamp()) - assert.Equal(t, pcommon.NewTimestampFromTime(before), dataPoint.StartTimestamp()) - assert.Equal(t, 17046641.0, dataPoint.DoubleValue()) - assertAttributes(t, expectedAttributes, dataPoint.Attributes()) - - metric = metricSlice.At(1 - memoryMetricPosition) - assert.Equal(t, "rep.disk", metric.Name()) - assert.Equal(t, pmetric.MetricTypeGauge, metric.Type()) - assert.Equal(t, 1, metric.Gauge().DataPoints().Len()) - dataPoint = metric.Gauge().DataPoints().At(0) - assert.Equal(t, pcommon.NewTimestampFromTime(now), dataPoint.Timestamp()) - assert.Equal(t, pcommon.NewTimestampFromTime(before), dataPoint.StartTimestamp()) - assert.Equal(t, 10231808.0, dataPoint.DoubleValue()) - assertAttributes(t, expectedAttributes, dataPoint.Attributes()) + metric = metricSlice.At(1 - memoryMetricPosition) + assert.Equal(t, "rep.disk", metric.Name()) + assert.Equal(t, pmetric.MetricTypeGauge, metric.Type()) + assert.Equal(t, 1, metric.Gauge().DataPoints().Len()) + dataPoint = metric.Gauge().DataPoints().At(0) + assert.Equal(t, pcommon.NewTimestampFromTime(now), dataPoint.Timestamp()) + assert.Equal(t, pcommon.NewTimestampFromTime(before), dataPoint.StartTimestamp()) + assert.Equal(t, 10231808.0, dataPoint.DoubleValue()) + assertAttributes(t, tt.expected, dataPoint.Attributes()) + }) + } } func TestConvertLogsEnvelope(t *testing.T) { @@ -148,16 +221,19 @@ func TestConvertLogsEnvelope(t *testing.T) { before := time.Now().Add(-time.Second) t.Parallel() tests := []struct { - id string - envelope loggregator_v2.Envelope - expected map[string]any + id string + envelope loggregator_v2.Envelope + expected map[string]any + resourceAttrs bool }{ { id: "normal-without-sourcetype-tag", envelope: loggregator_v2.Envelope{ Timestamp: before.UnixNano(), SourceId: "744e75bb-69d1-4cf4-b037-76875368097b", - Tags: map[string]string{}, + Tags: map[string]string{ + "custom": "datapoint", + }, Message: &loggregator_v2.Envelope_Log{ Log: &loggregator_v2.Log{ Payload: []byte(`test-app. Says Hello. on index: 0`), @@ -169,11 +245,38 @@ func TestConvertLogsEnvelope(t *testing.T) { "Timestamp": before, "Attributes": map[string]string{ "org.cloudfoundry.source_id": "744e75bb-69d1-4cf4-b037-76875368097b", + "org.cloudfoundry.custom": "datapoint", + }, + "Body": `test-app. Says Hello. on index: 0`, + "SeverityNumber": plog.SeverityNumberInfo, + "SeverityText": plog.SeverityNumberInfo.String(), + }, + }, + { + id: "normal-without-sourcetype-tag-resource-attrs", + envelope: loggregator_v2.Envelope{ + Timestamp: before.UnixNano(), + SourceId: "744e75bb-69d1-4cf4-b037-76875368097b", + Tags: map[string]string{ + "custom": "datapoint", + }, + Message: &loggregator_v2.Envelope_Log{ + Log: &loggregator_v2.Log{ + Payload: []byte(`test-app. Says Hello. on index: 0`), + Type: loggregator_v2.Log_OUT, + }, + }, + }, + expected: map[string]any{ + "Timestamp": before, + "Attributes": map[string]string{ + "org.cloudfoundry.custom": "datapoint", }, "Body": `test-app. Says Hello. on index: 0`, "SeverityNumber": plog.SeverityNumberInfo, "SeverityText": plog.SeverityNumberInfo.String(), }, + resourceAttrs: true, }, { id: "json-log-with-sourcetype-error", @@ -187,6 +290,7 @@ func TestConvertLogsEnvelope(t *testing.T) { "job": "diego-cell", "index": "bc276108-8282-48a5-bae7-c009c4392246", "ip": "10.80.0.2", + "custom": "datapoint", }, Message: &loggregator_v2.Envelope_Log{ Log: &loggregator_v2.Log{ @@ -205,21 +309,61 @@ func TestConvertLogsEnvelope(t *testing.T) { "org.cloudfoundry.job": "diego-cell", "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", "org.cloudfoundry.ip": "10.80.0.2", + "org.cloudfoundry.custom": "datapoint", + }, + "Body": `{"timestamp":"2024-05-29T16:16:28.063062903Z","level":"info","source":"guardian","message":"guardian.api.garden-server.get-properties.got-properties","data":{"handle":"e885e8be-c6a7-43b1-5066-a821","session":"2.1.209666"}}`, + "SeverityNumber": plog.SeverityNumberError, + "SeverityText": plog.SeverityNumberError.String(), + }, + }, + { + id: "json-log-with-sourcetype-error-resource-attrs", + envelope: loggregator_v2.Envelope{ + Timestamp: before.UnixNano(), + SourceId: "df75aec8-b937-4dc8-9b4d-c336e36e3895", + Tags: map[string]string{ + "source_type": "APP/PROC/WEB", + "origin": "rep", + "deployment": "cf", + "job": "diego-cell", + "index": "bc276108-8282-48a5-bae7-c009c4392246", + "ip": "10.80.0.2", + "custom": "datapoint", + }, + Message: &loggregator_v2.Envelope_Log{ + Log: &loggregator_v2.Log{ + Payload: []byte(`{"timestamp":"2024-05-29T16:16:28.063062903Z","level":"info","source":"guardian","message":"guardian.api.garden-server.get-properties.got-properties","data":{"handle":"e885e8be-c6a7-43b1-5066-a821","session":"2.1.209666"}}`), + Type: loggregator_v2.Log_ERR, + }, + }, + }, + expected: map[string]any{ + "Timestamp": before, + "Attributes": map[string]string{ + "org.cloudfoundry.custom": "datapoint", }, "Body": `{"timestamp":"2024-05-29T16:16:28.063062903Z","level":"info","source":"guardian","message":"guardian.api.garden-server.get-properties.got-properties","data":{"handle":"e885e8be-c6a7-43b1-5066-a821","session":"2.1.209666"}}`, "SeverityNumber": plog.SeverityNumberError, "SeverityText": plog.SeverityNumberError.String(), }, + resourceAttrs: true, }, } for i := range tests { tt := tests[i] t.Run(tt.id, func(t *testing.T) { - logSlice := plog.NewLogRecordSlice() - e := convertEnvelopeToLogs(&tt.envelope, logSlice, now) + resourceLogSlice := plog.NewResourceLogsSlice() + if tt.resourceAttrs { + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), false)) + }) + } + e := convertEnvelopeToLogs(&tt.envelope, resourceLogSlice.AppendEmpty().ScopeLogs().AppendEmpty().LogRecords(), now) require.NoError(t, e) - require.Equal(t, 1, logSlice.Len()) - log := logSlice.At(0) + require.Equal(t, 1, resourceLogSlice.Len()) + + log := resourceLogSlice.At(0).ScopeLogs().At(0).LogRecords().At(0) assert.Equal(t, tt.expected["Body"], log.Body().AsString()) assert.Equal(t, tt.expected["SeverityText"], log.SeverityText()) assert.Equal(t, pcommon.NewTimestampFromTime(tt.expected["Timestamp"].(time.Time)), log.Timestamp()) diff --git a/receiver/cloudfoundryreceiver/go.mod b/receiver/cloudfoundryreceiver/go.mod index 77e89aaafa23..90ac1e559e67 100644 --- a/receiver/cloudfoundryreceiver/go.mod +++ b/receiver/cloudfoundryreceiver/go.mod @@ -15,6 +15,7 @@ require ( go.opentelemetry.io/collector/confmap v1.22.0 go.opentelemetry.io/collector/consumer v1.22.0 go.opentelemetry.io/collector/consumer/consumertest v0.116.0 + go.opentelemetry.io/collector/featuregate v1.22.0 go.opentelemetry.io/collector/pdata v1.22.0 go.opentelemetry.io/collector/receiver v0.116.0 go.opentelemetry.io/collector/receiver/receivertest v0.116.0 @@ -23,7 +24,7 @@ require ( ) require ( - code.cloudfoundry.org/go-diodes v0.0.0-20211115184647-b584dd5df32c // indirect + code.cloudfoundry.org/go-diodes v0.0.0-20241007161556-ec30366c7912 // indirect code.cloudfoundry.org/rfc5424 v0.0.0-20201103192249-000122071b78 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -35,6 +36,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect @@ -45,7 +47,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/onsi/ginkgo v1.16.5 // indirect - github.com/onsi/gomega v1.17.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.11.1 // indirect diff --git a/receiver/cloudfoundryreceiver/go.sum b/receiver/cloudfoundryreceiver/go.sum index 8f1330566bce..eb07f375663d 100644 --- a/receiver/cloudfoundryreceiver/go.sum +++ b/receiver/cloudfoundryreceiver/go.sum @@ -1,5 +1,5 @@ -code.cloudfoundry.org/go-diodes v0.0.0-20211115184647-b584dd5df32c h1:N2GMlHc/SJQk7BkaME/kDHaciVTy4NuRmxVJLhnqKK8= -code.cloudfoundry.org/go-diodes v0.0.0-20211115184647-b584dd5df32c/go.mod h1:o7lq/SmHshDVxHdRJ/fMT3VPcoXyE1HcRXbG8QibO3k= +code.cloudfoundry.org/go-diodes v0.0.0-20241007161556-ec30366c7912 h1:4Lj5tSZsoT1Fn98SdQHt4pjaUeopFH7/7pFHpuMJCyM= +code.cloudfoundry.org/go-diodes v0.0.0-20241007161556-ec30366c7912/go.mod h1:UswWmjC8YkMF7v0jCgMOWNRfqJoTTBbpRzX3Emk2HBU= code.cloudfoundry.org/go-loggregator v7.4.0+incompatible h1:KqZYloMQWM5Zg/BQKunOIA4OODh7djZbk48qqbowNFI= code.cloudfoundry.org/go-loggregator v7.4.0+incompatible/go.mod h1:KPBTRqj+y738Nhf1+g4JHFaBU8j7dedirR5ETNHvMXU= code.cloudfoundry.org/rfc5424 v0.0.0-20201103192249-000122071b78 h1:mrZQaZmuDIPhSp6b96b+CRKC2uH44ifa5cjDV2epKis= @@ -20,7 +20,10 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -32,8 +35,6 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= @@ -41,12 +42,15 @@ github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 h1:5iH8iuqE5apketRbSFBy+X1V0o+l+8NF1avt4HWl7cA= +github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= @@ -78,15 +82,14 @@ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.16.2/go.mod h1:CObGmKUOKaSC0RjmoAK7tKyn4Azo5P2IWuoMnvwxz1E= -github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4= +github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY= -github.com/onsi/gomega v1.17.0 h1:9Luw4uT5HTjHTN8+aNcSThgH1vdXnmdJ8xIfZ4wyTRE= -github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8= +github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -140,6 +143,8 @@ go.opentelemetry.io/collector/extension/auth v0.116.0 h1:oolKkFBIS4vhJ4ZWTD4Bp+3 go.opentelemetry.io/collector/extension/auth v0.116.0/go.mod h1:3WeZgIiiP7wcB+tID4G3ml6J/R2oJ359PxQh/pUFnSk= go.opentelemetry.io/collector/extension/auth/authtest v0.116.0 h1:KcMvjb4R0wpkmmi7EOk7zT5sgl7uwXY/VQfMEUVYcLM= go.opentelemetry.io/collector/extension/auth/authtest v0.116.0/go.mod h1:zyWTdh+CUKh7BbszTWUWp806NA6EDyix77O4Q6XaOA8= +go.opentelemetry.io/collector/featuregate v1.22.0 h1:1TUcdqA5VpEsX1Lrr6GG15CptZxDXxiu5AXgwpeNSR4= +go.opentelemetry.io/collector/featuregate v1.22.0/go.mod h1:3GaXqflNDVwWndNGBJ1+XJFy3Fv/XrFgjMN60N3z7yg= go.opentelemetry.io/collector/pdata v1.22.0 h1:3yhjL46NLdTMoP8rkkcE9B0pzjf2973crn0KKhX5UrI= go.opentelemetry.io/collector/pdata v1.22.0/go.mod h1:nLLf6uDg8Kn5g3WNZwGyu8+kf77SwOqQvMTb5AXEbEY= go.opentelemetry.io/collector/pdata/pprofile v0.116.0 h1:iE6lqkO7Hi6lTIIml1RI7yQ55CKqW12R2qHinwF5Zuk= @@ -183,7 +188,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -198,15 +202,11 @@ golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -214,6 +214,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -228,8 +230,6 @@ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -241,7 +241,5 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/receiver/cloudfoundryreceiver/receiver.go b/receiver/cloudfoundryreceiver/receiver.go index 4084af4f5650..c5d3deb06d3b 100644 --- a/receiver/cloudfoundryreceiver/receiver.go +++ b/receiver/cloudfoundryreceiver/receiver.go @@ -7,10 +7,12 @@ import ( "context" "errors" "fmt" + "reflect" "sync" "time" "code.cloudfoundry.org/go-loggregator" + "code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" @@ -171,15 +173,12 @@ func (cfr *cloudFoundryReceiver) streamMetrics( break } metrics := pmetric.NewMetrics() - libraryMetrics := createLibraryMetricsSlice(metrics) for _, envelope := range envelopes { if envelope != nil { - // There is no concept of startTime in CF loggregator, and we do not know the uptime of the component - // from which the metric originates, so just provide receiver start time as metric start time - convertEnvelopeToMetrics(envelope, libraryMetrics, cfr.receiverStartTime) + buildMetrics(metrics, envelope, cfr.receiverStartTime) } } - if libraryMetrics.Len() > 0 { + if metrics.ResourceMetrics().Len() > 0 { obsCtx := cfr.obsrecv.StartMetricsOp(ctx) err := cfr.nextMetrics.ConsumeMetrics(ctx, metrics) if err != nil { @@ -209,14 +208,13 @@ func (cfr *cloudFoundryReceiver) streamLogs( break } logs := plog.NewLogs() - libraryLogs := createLibraryLogsSlice(logs) observedTime := time.Now() for _, envelope := range envelopes { if envelope != nil { - _ = convertEnvelopeToLogs(envelope, libraryLogs, observedTime) + buildLogs(logs, envelope, observedTime) } } - if libraryLogs.Len() > 0 { + if logs.ResourceLogs().Len() > 0 { obsCtx := cfr.obsrecv.StartLogsOp(ctx) err := cfr.nextLogs.ConsumeLogs(ctx, logs) if err != nil { @@ -227,16 +225,60 @@ func (cfr *cloudFoundryReceiver) streamLogs( } } -func createLibraryMetricsSlice(metrics pmetric.Metrics) pmetric.MetricSlice { - resourceMetric := metrics.ResourceMetrics().AppendEmpty() - libraryMetrics := resourceMetric.ScopeMetrics().AppendEmpty() - libraryMetrics.Scope().SetName(metadata.ScopeName) - return libraryMetrics.Metrics() +func buildLogs(logs plog.Logs, envelope *loggregator_v2.Envelope, observedTime time.Time) { + resourceLogs := getResourceLogs(logs, envelope) + setupLogsScope(resourceLogs) + _ = convertEnvelopeToLogs(envelope, resourceLogs.ScopeLogs().At(0).LogRecords(), observedTime) } -func createLibraryLogsSlice(logs plog.Logs) plog.LogRecordSlice { - resourceLog := logs.ResourceLogs().AppendEmpty() - libraryLogs := resourceLog.ScopeLogs().AppendEmpty() - libraryLogs.Scope().SetName(metadata.ScopeName) - return libraryLogs.LogRecords() +func buildMetrics(metrics pmetric.Metrics, envelope *loggregator_v2.Envelope, observedTime time.Time) { + resourceMetrics := getResourceMetrics(metrics, envelope) + setupMetricsScope(resourceMetrics) + convertEnvelopeToMetrics(envelope, resourceMetrics.ScopeMetrics().At(0).Metrics(), observedTime) +} + +func setupMetricsScope(resourceMetrics pmetric.ResourceMetrics) { + if resourceMetrics.ScopeMetrics().Len() == 0 { + libraryMetrics := resourceMetrics.ScopeMetrics().AppendEmpty() + libraryMetrics.Scope().SetName(metadata.ScopeName) + } +} + +func getResourceMetrics(metrics pmetric.Metrics, envelope *loggregator_v2.Envelope) pmetric.ResourceMetrics { + if !allowResourceAttributes.IsEnabled() { + return metrics.ResourceMetrics().AppendEmpty() + } + + attrs := getEnvelopeResourceAttributes(envelope) + for i := 0; i < metrics.ResourceMetrics().Len(); i++ { + if reflect.DeepEqual(metrics.ResourceMetrics().At(i).Resource().Attributes().AsRaw(), attrs.AsRaw()) { + return metrics.ResourceMetrics().At(i) + } + } + resource := metrics.ResourceMetrics().AppendEmpty() + attrs.CopyTo(resource.Resource().Attributes()) + return resource +} + +func setupLogsScope(resourceLogs plog.ResourceLogs) { + if resourceLogs.ScopeLogs().Len() == 0 { + libraryLogs := resourceLogs.ScopeLogs().AppendEmpty() + libraryLogs.Scope().SetName(metadata.ScopeName) + } +} + +func getResourceLogs(logs plog.Logs, envelope *loggregator_v2.Envelope) plog.ResourceLogs { + if !allowResourceAttributes.IsEnabled() { + return logs.ResourceLogs().AppendEmpty() + } + + attrs := getEnvelopeResourceAttributes(envelope) + for i := 0; i < logs.ResourceLogs().Len(); i++ { + if reflect.DeepEqual(logs.ResourceLogs().At(i).Resource().Attributes().AsRaw(), attrs.AsRaw()) { + return logs.ResourceLogs().At(i) + } + } + resource := logs.ResourceLogs().AppendEmpty() + attrs.CopyTo(resource.Resource().Attributes()) + return resource } diff --git a/receiver/cloudfoundryreceiver/receiver_test.go b/receiver/cloudfoundryreceiver/receiver_test.go index eec1da92e50c..9bbe5e1d21c0 100644 --- a/receiver/cloudfoundryreceiver/receiver_test.go +++ b/receiver/cloudfoundryreceiver/receiver_test.go @@ -6,11 +6,18 @@ package cloudfoundryreceiver import ( "context" "testing" + "time" + "code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/cloudfoundryreceiver/internal/metadata" ) // Test to make sure a new metrics receiver can be created properly, started and shutdown with the default config @@ -62,3 +69,651 @@ func TestDefaultValidLogsReceiver(t *testing.T) { err = receiver.Shutdown(ctx) require.NoError(t, err) } + +func TestSetupMetricsScope(t *testing.T) { + resourceSetup := pmetric.NewResourceMetrics() + scope := resourceSetup.ScopeMetrics().AppendEmpty() + scope.Scope().SetName(metadata.ScopeName) + + tests := []struct { + name string + in pmetric.ResourceMetrics + out pmetric.ResourceMetrics + }{ + { + name: "scope empty", + in: pmetric.NewResourceMetrics(), + out: resourceSetup, + }, + { + name: "scope set", + in: resourceSetup, + out: resourceSetup, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + setupMetricsScope(tt.in) + require.Equal(t, tt.out, tt.in) + }) + } +} + +func TestGetResourceMetrics(t *testing.T) { + metrics := pmetric.NewMetrics() + resource := metrics.ResourceMetrics().AppendEmpty() + resource.Resource().Attributes().PutStr("org.cloudfoundry.origin", "rep") + + resource2 := pmetric.NewResourceMetrics() + resource2.Resource().Attributes().PutStr("org.cloudfoundry.origin", "rep2") + + tests := []struct { + name string + resourceAttrs bool + envelope loggregator_v2.Envelope + metrics pmetric.Metrics + expected pmetric.ResourceMetrics + }{ + { + name: "resource attributes true", + envelope: loggregator_v2.Envelope{}, + resourceAttrs: false, + metrics: pmetric.NewMetrics(), + expected: pmetric.NewResourceMetrics(), + }, + { + name: "empty metrics", + envelope: loggregator_v2.Envelope{}, + resourceAttrs: true, + metrics: pmetric.NewMetrics(), + expected: pmetric.NewResourceMetrics(), + }, + { + name: "matched resource metrics", + envelope: loggregator_v2.Envelope{ + Tags: map[string]string{ + "origin": "rep", + "custom": "datapoint", + }, + }, + resourceAttrs: true, + metrics: metrics, + expected: resource, + }, + { + name: "non-matched resource metrics", + envelope: loggregator_v2.Envelope{ + Tags: map[string]string{ + "origin": "rep2", + "custom": "datapoint", + }, + }, + resourceAttrs: true, + metrics: metrics, + expected: resource2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.resourceAttrs { + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), false)) + }) + } + require.Equal(t, tt.expected, getResourceMetrics(tt.metrics, &tt.envelope)) + }) + } +} + +func TestSetupLogsScope(t *testing.T) { + resourceSetup := plog.NewResourceLogs() + scope := resourceSetup.ScopeLogs().AppendEmpty() + scope.Scope().SetName(metadata.ScopeName) + + tests := []struct { + name string + in plog.ResourceLogs + out plog.ResourceLogs + }{ + { + name: "scope empty", + in: plog.NewResourceLogs(), + out: resourceSetup, + }, + { + name: "scope set", + in: resourceSetup, + out: resourceSetup, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + setupLogsScope(tt.in) + require.Equal(t, tt.out, tt.in) + }) + } +} + +func TestGetResourceLogs(t *testing.T) { + logs := plog.NewLogs() + resource := logs.ResourceLogs().AppendEmpty() + resource.Resource().Attributes().PutStr("org.cloudfoundry.origin", "rep") + + resource2 := plog.NewResourceLogs() + resource2.Resource().Attributes().PutStr("org.cloudfoundry.origin", "rep2") + + tests := []struct { + name string + resourceAttrs bool + envelope loggregator_v2.Envelope + logs plog.Logs + expected plog.ResourceLogs + }{ + { + name: "resource attributes true", + envelope: loggregator_v2.Envelope{}, + resourceAttrs: false, + logs: plog.NewLogs(), + expected: plog.NewResourceLogs(), + }, + { + name: "empty logs", + envelope: loggregator_v2.Envelope{}, + resourceAttrs: true, + logs: plog.NewLogs(), + expected: plog.NewResourceLogs(), + }, + { + name: "matched resource logs", + envelope: loggregator_v2.Envelope{ + Tags: map[string]string{ + "origin": "rep", + "custom": "datapoint", + }, + }, + resourceAttrs: true, + logs: logs, + expected: resource, + }, + { + name: "non-matched resource logs", + envelope: loggregator_v2.Envelope{ + Tags: map[string]string{ + "origin": "rep2", + "custom": "datapoint", + }, + }, + resourceAttrs: true, + logs: logs, + expected: resource2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.resourceAttrs { + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), false)) + }) + } + require.Equal(t, tt.expected, getResourceLogs(tt.logs, &tt.envelope)) + }) + } +} + +func TestBuildLogsWithResourceAttrs(t *testing.T) { + logs := plog.NewLogs() + observedTime := time.Date(2022, time.July, 14, 9, 30, 0, 0, time.UTC) + + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), false)) + }) + + // adding the first item to the logs + + envelope := &loggregator_v2.Envelope{ + Timestamp: observedTime.Unix(), + SourceId: "uaa", + Tags: map[string]string{ + "origin": "gorouter", + "deployment": "cf", + "job": "router", + "index": "bc276108-8282-48a5-bae7-c009c4392246", + "ip": "10.244.0.34", + "custom": "datapoint", + }, + } + + buildLogs(logs, envelope, observedTime) + + // check if the first log resource was created successfully + // we await single resource, with single scope and single log + require.Equal(t, 1, logs.ResourceLogs().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.origin": "gorouter", + "org.cloudfoundry.deployment": "cf", + "org.cloudfoundry.job": "router", + "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", + "org.cloudfoundry.ip": "10.244.0.34", + "org.cloudfoundry.source_id": "uaa", + }, logs.ResourceLogs().At(0).Resource().Attributes().AsRaw()) + require.Equal(t, 1, logs.ResourceLogs().At(0).ScopeLogs().Len()) + require.Equal(t, metadata.ScopeName, logs.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Name()) + require.Equal(t, 1, logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom": "datapoint", + }, logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()) + + // adding another envelope matching the same resource attributes + + envelope = &loggregator_v2.Envelope{ + Timestamp: observedTime.Unix(), + SourceId: "uaa", + Tags: map[string]string{ + "origin": "gorouter", + "deployment": "cf", + "job": "router", + "index": "bc276108-8282-48a5-bae7-c009c4392246", + "ip": "10.244.0.34", + "custom2": "datapoint2", + }, + } + + buildLogs(logs, envelope, observedTime) + + // check that the first log resource contains a single scope, but 2 logs + require.Equal(t, 1, logs.ResourceLogs().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.origin": "gorouter", + "org.cloudfoundry.deployment": "cf", + "org.cloudfoundry.job": "router", + "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", + "org.cloudfoundry.ip": "10.244.0.34", + "org.cloudfoundry.source_id": "uaa", + }, logs.ResourceLogs().At(0).Resource().Attributes().AsRaw()) + require.Equal(t, 1, logs.ResourceLogs().At(0).ScopeLogs().Len()) + require.Equal(t, metadata.ScopeName, logs.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Name()) + require.Equal(t, 2, logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom": "datapoint", + }, logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom2": "datapoint2", + }, logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().AsRaw()) + + // adding another envelope not matching the resource attributes + + envelope = &loggregator_v2.Envelope{ + Timestamp: observedTime.Unix(), + SourceId: "uaa33", + Tags: map[string]string{ + "origin": "gorouter", + "deployment": "cf", + "job": "router", + "index": "bc276108-8282-48a5-bae7-c009c4392246", + "ip": "10.244.0.34", + "custom3": "datapoint3", + }, + } + + buildLogs(logs, envelope, observedTime) + + // check that the new resource was created and exists next to the original one + require.Equal(t, 2, logs.ResourceLogs().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.origin": "gorouter", + "org.cloudfoundry.deployment": "cf", + "org.cloudfoundry.job": "router", + "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", + "org.cloudfoundry.ip": "10.244.0.34", + "org.cloudfoundry.source_id": "uaa33", + }, logs.ResourceLogs().At(1).Resource().Attributes().AsRaw()) + require.Equal(t, 1, logs.ResourceLogs().At(1).ScopeLogs().Len()) + require.Equal(t, metadata.ScopeName, logs.ResourceLogs().At(1).ScopeLogs().At(0).Scope().Name()) + require.Equal(t, 1, logs.ResourceLogs().At(1).ScopeLogs().At(0).LogRecords().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom3": "datapoint3", + }, logs.ResourceLogs().At(1).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()) +} + +func TestBuildLogs(t *testing.T) { + logs := plog.NewLogs() + observedTime := time.Date(2022, time.July, 14, 9, 30, 0, 0, time.UTC) + + // adding the first item to the logs + + envelope := &loggregator_v2.Envelope{ + Timestamp: observedTime.Unix(), + SourceId: "uaa", + Tags: map[string]string{ + "origin": "gorouter", + "deployment": "cf", + "job": "router", + "index": "bc276108-8282-48a5-bae7-c009c4392246", + "ip": "10.244.0.34", + "custom": "datapoint", + }, + } + + buildLogs(logs, envelope, observedTime) + + // check if the first log resource was created successfully + // we await single resource, with single scope and single log + require.Equal(t, 1, logs.ResourceLogs().Len()) + require.Equal(t, 0, logs.ResourceLogs().At(0).Resource().Attributes().Len()) + require.Equal(t, 1, logs.ResourceLogs().At(0).ScopeLogs().Len()) + require.Equal(t, metadata.ScopeName, logs.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Name()) + require.Equal(t, 1, logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom": "datapoint", + "org.cloudfoundry.origin": "gorouter", + "org.cloudfoundry.deployment": "cf", + "org.cloudfoundry.job": "router", + "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", + "org.cloudfoundry.ip": "10.244.0.34", + "org.cloudfoundry.source_id": "uaa", + }, logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()) + + // adding another envelope matching the same resource attributes + + envelope = &loggregator_v2.Envelope{ + Timestamp: observedTime.Unix(), + SourceId: "uaa", + Tags: map[string]string{ + "origin": "gorouter", + "deployment": "cf", + "job": "router", + "index": "bc276108-8282-48a5-bae7-c009c4392246", + "ip": "10.244.0.34", + "custom2": "datapoint2", + }, + } + + buildLogs(logs, envelope, observedTime) + + // check that another resource was created + require.Equal(t, 2, logs.ResourceLogs().Len()) + require.Equal(t, 0, logs.ResourceLogs().At(0).Resource().Attributes().Len()) + require.Equal(t, 0, logs.ResourceLogs().At(1).Resource().Attributes().Len()) + require.Equal(t, 1, logs.ResourceLogs().At(0).ScopeLogs().Len()) + require.Equal(t, metadata.ScopeName, logs.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Name()) + require.Equal(t, 1, logs.ResourceLogs().At(1).ScopeLogs().Len()) + require.Equal(t, metadata.ScopeName, logs.ResourceLogs().At(1).ScopeLogs().At(0).Scope().Name()) + require.Equal(t, 1, logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom": "datapoint", + "org.cloudfoundry.origin": "gorouter", + "org.cloudfoundry.deployment": "cf", + "org.cloudfoundry.job": "router", + "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", + "org.cloudfoundry.ip": "10.244.0.34", + "org.cloudfoundry.source_id": "uaa", + }, logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()) + require.Equal(t, 1, logs.ResourceLogs().At(1).ScopeLogs().At(0).LogRecords().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom2": "datapoint2", + "org.cloudfoundry.origin": "gorouter", + "org.cloudfoundry.deployment": "cf", + "org.cloudfoundry.job": "router", + "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", + "org.cloudfoundry.ip": "10.244.0.34", + "org.cloudfoundry.source_id": "uaa", + }, logs.ResourceLogs().At(1).ScopeLogs().At(0).LogRecords().At(0).Attributes().AsRaw()) +} + +func TestBuildMetricsWithResourceAttrs(t *testing.T) { + metrics := pmetric.NewMetrics() + observedTime := time.Date(2022, time.July, 14, 9, 30, 0, 0, time.UTC) + + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), true)) + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(allowResourceAttributes.ID(), false)) + }) + + // adding the first item to the metrics + + envelope := &loggregator_v2.Envelope{ + Timestamp: observedTime.Unix(), + SourceId: "uaa", + Tags: map[string]string{ + "origin": "gorouter", + "deployment": "cf", + "job": "router", + "index": "bc276108-8282-48a5-bae7-c009c4392246", + "ip": "10.244.0.34", + "custom": "datapoint", + }, + Message: &loggregator_v2.Envelope_Gauge{ + Gauge: &loggregator_v2.Gauge{ + Metrics: map[string]*loggregator_v2.GaugeValue{ + "memory": { + Unit: "bytes", + Value: 17046641.0, + }, + }, + }, + }, + } + + buildMetrics(metrics, envelope, observedTime) + + // check if the first metric resource was created successfully + // we await single resource, with single scope and single metric + require.Equal(t, 1, metrics.ResourceMetrics().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.origin": "gorouter", + "org.cloudfoundry.deployment": "cf", + "org.cloudfoundry.job": "router", + "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", + "org.cloudfoundry.ip": "10.244.0.34", + "org.cloudfoundry.source_id": "uaa", + }, metrics.ResourceMetrics().At(0).Resource().Attributes().AsRaw()) + require.Equal(t, 1, metrics.ResourceMetrics().At(0).ScopeMetrics().Len()) + require.Equal(t, metadata.ScopeName, metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Name()) + require.Equal(t, 1, metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom": "datapoint", + }, metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Gauge().DataPoints().At(0).Attributes().AsRaw()) + + // adding another envelope matching the same resource attributes + + envelope = &loggregator_v2.Envelope{ + Timestamp: observedTime.Unix(), + SourceId: "uaa", + Tags: map[string]string{ + "origin": "gorouter", + "deployment": "cf", + "job": "router", + "index": "bc276108-8282-48a5-bae7-c009c4392246", + "ip": "10.244.0.34", + "custom2": "datapoint2", + }, + Message: &loggregator_v2.Envelope_Gauge{ + Gauge: &loggregator_v2.Gauge{ + Metrics: map[string]*loggregator_v2.GaugeValue{ + "memory": { + Unit: "bytes", + Value: 17046641.0, + }, + }, + }, + }, + } + + buildMetrics(metrics, envelope, observedTime) + + // check that the first metric resource contains a single scope, but 2 metrics + require.Equal(t, 1, metrics.ResourceMetrics().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.origin": "gorouter", + "org.cloudfoundry.deployment": "cf", + "org.cloudfoundry.job": "router", + "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", + "org.cloudfoundry.ip": "10.244.0.34", + "org.cloudfoundry.source_id": "uaa", + }, metrics.ResourceMetrics().At(0).Resource().Attributes().AsRaw()) + require.Equal(t, 1, metrics.ResourceMetrics().At(0).ScopeMetrics().Len()) + require.Equal(t, metadata.ScopeName, metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Name()) + require.Equal(t, 2, metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom": "datapoint", + }, metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Gauge().DataPoints().At(0).Attributes().AsRaw()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom2": "datapoint2", + }, metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).Gauge().DataPoints().At(0).Attributes().AsRaw()) + + // adding another envelope not matching the resource attributes + + envelope = &loggregator_v2.Envelope{ + Timestamp: observedTime.Unix(), + SourceId: "uaa33", + Tags: map[string]string{ + "origin": "gorouter", + "deployment": "cf", + "job": "router", + "index": "bc276108-8282-48a5-bae7-c009c4392246", + "ip": "10.244.0.34", + "custom3": "datapoint3", + }, + Message: &loggregator_v2.Envelope_Gauge{ + Gauge: &loggregator_v2.Gauge{ + Metrics: map[string]*loggregator_v2.GaugeValue{ + "memory": { + Unit: "bytes", + Value: 17046641.0, + }, + }, + }, + }, + } + + buildMetrics(metrics, envelope, observedTime) + + // check that the new resource was created and exists next to the original one + require.Equal(t, 2, metrics.ResourceMetrics().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.origin": "gorouter", + "org.cloudfoundry.deployment": "cf", + "org.cloudfoundry.job": "router", + "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", + "org.cloudfoundry.ip": "10.244.0.34", + "org.cloudfoundry.source_id": "uaa33", + }, metrics.ResourceMetrics().At(1).Resource().Attributes().AsRaw()) + require.Equal(t, 1, metrics.ResourceMetrics().At(1).ScopeMetrics().Len()) + require.Equal(t, metadata.ScopeName, metrics.ResourceMetrics().At(1).ScopeMetrics().At(0).Scope().Name()) + require.Equal(t, 1, metrics.ResourceMetrics().At(1).ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom3": "datapoint3", + }, metrics.ResourceMetrics().At(1).ScopeMetrics().At(0).Metrics().At(0).Gauge().DataPoints().At(0).Attributes().AsRaw()) +} + +func TestBuildMetrics(t *testing.T) { + metrics := pmetric.NewMetrics() + observedTime := time.Date(2022, time.July, 14, 9, 30, 0, 0, time.UTC) + + // adding the first item to the metrics + + envelope := &loggregator_v2.Envelope{ + Timestamp: observedTime.Unix(), + SourceId: "uaa", + Tags: map[string]string{ + "origin": "gorouter", + "deployment": "cf", + "job": "router", + "index": "bc276108-8282-48a5-bae7-c009c4392246", + "ip": "10.244.0.34", + "custom": "datapoint", + }, + Message: &loggregator_v2.Envelope_Gauge{ + Gauge: &loggregator_v2.Gauge{ + Metrics: map[string]*loggregator_v2.GaugeValue{ + "memory": { + Unit: "bytes", + Value: 17046641.0, + }, + }, + }, + }, + } + + buildMetrics(metrics, envelope, observedTime) + + // check if the first metric resource was created successfully + // we await single resource, with single scope and single metric + require.Equal(t, 1, metrics.ResourceMetrics().Len()) + require.Equal(t, 0, metrics.ResourceMetrics().At(0).Resource().Attributes().Len()) + require.Equal(t, 1, metrics.ResourceMetrics().At(0).ScopeMetrics().Len()) + require.Equal(t, metadata.ScopeName, metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Name()) + require.Equal(t, 1, metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom": "datapoint", + "org.cloudfoundry.origin": "gorouter", + "org.cloudfoundry.deployment": "cf", + "org.cloudfoundry.job": "router", + "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", + "org.cloudfoundry.ip": "10.244.0.34", + "org.cloudfoundry.source_id": "uaa", + }, metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Gauge().DataPoints().At(0).Attributes().AsRaw()) + + // adding another envelope matching the resource attributes + + envelope = &loggregator_v2.Envelope{ + Timestamp: observedTime.Unix(), + SourceId: "uaa", + Tags: map[string]string{ + "origin": "gorouter", + "deployment": "cf", + "job": "router", + "index": "bc276108-8282-48a5-bae7-c009c4392246", + "ip": "10.244.0.34", + "custom2": "datapoint2", + }, + Message: &loggregator_v2.Envelope_Gauge{ + Gauge: &loggregator_v2.Gauge{ + Metrics: map[string]*loggregator_v2.GaugeValue{ + "memory": { + Unit: "bytes", + Value: 17046641.0, + }, + }, + }, + }, + } + + buildMetrics(metrics, envelope, observedTime) + + // check that another resource was created + require.Equal(t, 2, metrics.ResourceMetrics().Len()) + require.Equal(t, 0, metrics.ResourceMetrics().At(0).Resource().Attributes().Len()) + require.Equal(t, 0, metrics.ResourceMetrics().At(1).Resource().Attributes().Len()) + require.Equal(t, 1, metrics.ResourceMetrics().At(0).ScopeMetrics().Len()) + require.Equal(t, metadata.ScopeName, metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Name()) + require.Equal(t, 1, metrics.ResourceMetrics().At(1).ScopeMetrics().Len()) + require.Equal(t, metadata.ScopeName, metrics.ResourceMetrics().At(1).ScopeMetrics().At(0).Scope().Name()) + require.Equal(t, 1, metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom": "datapoint", + "org.cloudfoundry.origin": "gorouter", + "org.cloudfoundry.deployment": "cf", + "org.cloudfoundry.job": "router", + "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", + "org.cloudfoundry.ip": "10.244.0.34", + "org.cloudfoundry.source_id": "uaa", + }, metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Gauge().DataPoints().At(0).Attributes().AsRaw()) + require.Equal(t, 1, metrics.ResourceMetrics().At(1).ScopeMetrics().At(0).Metrics().Len()) + require.Equal(t, map[string]any{ + "org.cloudfoundry.custom2": "datapoint2", + "org.cloudfoundry.origin": "gorouter", + "org.cloudfoundry.deployment": "cf", + "org.cloudfoundry.job": "router", + "org.cloudfoundry.index": "bc276108-8282-48a5-bae7-c009c4392246", + "org.cloudfoundry.ip": "10.244.0.34", + "org.cloudfoundry.source_id": "uaa", + }, metrics.ResourceMetrics().At(1).ScopeMetrics().At(0).Metrics().At(0).Gauge().DataPoints().At(0).Attributes().AsRaw()) +} From 190321bf459210b3907b3c20680e107573cf277c Mon Sep 17 00:00:00 2001 From: odubajDT Date: Mon, 30 Sep 2024 09:15:13 +0200 Subject: [PATCH 2/4] rename function Signed-off-by: odubajDT --- receiver/cloudfoundryreceiver/converter.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/receiver/cloudfoundryreceiver/converter.go b/receiver/cloudfoundryreceiver/converter.go index 9b37c12a133a..6577ba4a2d18 100644 --- a/receiver/cloudfoundryreceiver/converter.go +++ b/receiver/cloudfoundryreceiver/converter.go @@ -56,7 +56,7 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme dataPoint.SetTimestamp(pcommon.Timestamp(envelope.GetTimestamp())) dataPoint.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) if allowResourceAttributes.IsEnabled() { - attrs := getEnvelopeDatapointAttributes(envelope) + attrs := getEnvelopeDataAttributes(envelope) attrs.CopyTo(dataPoint.Attributes()) } else { copyEnvelopeAttributes(dataPoint.Attributes(), envelope) @@ -70,7 +70,7 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme dataPoint.SetTimestamp(pcommon.Timestamp(envelope.GetTimestamp())) dataPoint.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) if allowResourceAttributes.IsEnabled() { - attrs := getEnvelopeDatapointAttributes(envelope) + attrs := getEnvelopeDataAttributes(envelope) attrs.CopyTo(dataPoint.Attributes()) } else { copyEnvelopeAttributes(dataPoint.Attributes(), envelope) @@ -97,7 +97,7 @@ func convertEnvelopeToLogs(envelope *loggregator_v2.Envelope, logSlice plog.LogR return fmt.Errorf("unsupported envelope log type: %s", envelope.GetLog().GetType()) } if allowResourceAttributes.IsEnabled() { - attrs := getEnvelopeDatapointAttributes(envelope) + attrs := getEnvelopeDataAttributes(envelope) attrs.CopyTo(log.Attributes()) } else { copyEnvelopeAttributes(log.Attributes(), envelope) @@ -117,7 +117,7 @@ func copyEnvelopeAttributes(attributes pcommon.Map, envelope *loggregator_v2.Env } } -func getEnvelopeDatapointAttributes(envelope *loggregator_v2.Envelope) pcommon.Map { +func getEnvelopeDataAttributes(envelope *loggregator_v2.Envelope) pcommon.Map { attrs := pcommon.NewMap() for key, value := range envelope.Tags { if !slices.Contains(ResourceAttributesKeys, key) { From f3090b000786a2f1841f2004c68bd12c4cbad5bd Mon Sep 17 00:00:00 2001 From: odubajDT Date: Fri, 18 Oct 2024 07:44:19 +0200 Subject: [PATCH 3/4] polish test cases Signed-off-by: odubajDT --- receiver/cloudfoundryreceiver/receiver_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/cloudfoundryreceiver/receiver_test.go b/receiver/cloudfoundryreceiver/receiver_test.go index 9bbe5e1d21c0..b5ce98a5191b 100644 --- a/receiver/cloudfoundryreceiver/receiver_test.go +++ b/receiver/cloudfoundryreceiver/receiver_test.go @@ -116,7 +116,7 @@ func TestGetResourceMetrics(t *testing.T) { expected pmetric.ResourceMetrics }{ { - name: "resource attributes true", + name: "resource attributes false", envelope: loggregator_v2.Envelope{}, resourceAttrs: false, metrics: pmetric.NewMetrics(), @@ -214,7 +214,7 @@ func TestGetResourceLogs(t *testing.T) { expected plog.ResourceLogs }{ { - name: "resource attributes true", + name: "resource attributes false", envelope: loggregator_v2.Envelope{}, resourceAttrs: false, logs: plog.NewLogs(), From 0569f6b4f62e0d5d1fa82f20b609f2c9a010509e Mon Sep 17 00:00:00 2001 From: odubajDT Date: Fri, 20 Dec 2024 07:08:58 +0100 Subject: [PATCH 4/4] adapt versions Signed-off-by: odubajDT --- receiver/cloudfoundryreceiver/README.md | 2 +- receiver/cloudfoundryreceiver/converter.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/cloudfoundryreceiver/README.md b/receiver/cloudfoundryreceiver/README.md index 1e1812a03df9..ce7f85c151c6 100644 --- a/receiver/cloudfoundryreceiver/README.md +++ b/receiver/cloudfoundryreceiver/README.md @@ -143,7 +143,7 @@ The receiver maps the envelope attribute tags to the following OpenTelemetry att The `cloudfoundry.resourceAttributes.allow` [feature gate](https://github.com/open-telemetry/opentelemetry-collector/blob/main/featuregate/README.md#collector-feature-gates) allows the envelope tags being copied to the metrics as resource attributes instead of datapoint attributes (default `false`). Therefore all `org.cloudfoundry.*` datapoint attributes won't be present anymore on metrics datapoint level, but on resource level instead, since the attributes describe the resource and not the datapoints itself. -The `cloudfoundry.resourceAttributes.allow` feature gate is available since version `v0.109.0` and will be held at least for 2 versions (`v0.111.0`) until promoting to `beta` and another 2 vesions (`v0.113.0`) until promoting to `stable`. +The `cloudfoundry.resourceAttributes.allow` feature gate is available since version `v0.117.0` and will be held at least for 2 versions (`v0.119.0`) until promoting to `beta` and another 2 vesions (`v0.121.0`) until promoting to `stable`. Below you can see the list of attributes that are present the resource level instead of datapoint level (when `cloudfoundry.resourceAttributes.allow` feature gate is enabled): diff --git a/receiver/cloudfoundryreceiver/converter.go b/receiver/cloudfoundryreceiver/converter.go index 6577ba4a2d18..040ae58ae637 100644 --- a/receiver/cloudfoundryreceiver/converter.go +++ b/receiver/cloudfoundryreceiver/converter.go @@ -41,7 +41,7 @@ var allowResourceAttributes = featuregate.GlobalRegistry().MustRegister( "cloudfoundry.resourceAttributes.allow", featuregate.StageAlpha, featuregate.WithRegisterDescription("When enabled, envelope tags are copied to the metrics as resource attributes instead of datapoint attributes"), - featuregate.WithRegisterFromVersion("v0.109.0"), + featuregate.WithRegisterFromVersion("v0.117.0"), ) func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pmetric.MetricSlice, startTime time.Time) {