From 2f2ba8c329900cd188f4b310549852b8db5fe301 Mon Sep 17 00:00:00 2001 From: xu0o0 Date: Wed, 13 Nov 2024 04:31:50 +0800 Subject: [PATCH] [processor/k8sattributes] Add support for profiles signal (#35999) #### Description Add support for profiles signal to `k8sattributesprocessor`. #### Link to tracking issue Fixes #35983 #### Testing - factory_test.go - processor_test.go --- .../f-profiles-k8sattributesprocessor.yaml | 27 ++ processor/k8sattributesprocessor/README.md | 4 +- processor/k8sattributesprocessor/e2e_test.go | 276 ++++++++++++++++-- processor/k8sattributesprocessor/factory.go | 42 ++- .../k8sattributesprocessor/factory_test.go | 9 + processor/k8sattributesprocessor/go.mod | 10 +- processor/k8sattributesprocessor/go.sum | 4 + .../internal/metadata/generated_status.go | 7 +- .../k8sattributesprocessor/metadata.yaml | 1 + processor/k8sattributesprocessor/processor.go | 11 + .../k8sattributesprocessor/processor_test.go | 91 +++++- .../e2e/clusterrbac/collector/deployment.yaml | 1 + .../e2e/mixrbac/collector/deployment.yaml | 1 + .../collector/deployment.yaml | 1 + .../namespacedrbac/collector/deployment.yaml | 1 + 15 files changed, 440 insertions(+), 46 deletions(-) create mode 100644 .chloggen/f-profiles-k8sattributesprocessor.yaml diff --git a/.chloggen/f-profiles-k8sattributesprocessor.yaml b/.chloggen/f-profiles-k8sattributesprocessor.yaml new file mode 100644 index 000000000000..33cc716a263c --- /dev/null +++ b/.chloggen/f-profiles-k8sattributesprocessor.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sattributesprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for profiles signal + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35983] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/k8sattributesprocessor/README.md b/processor/k8sattributesprocessor/README.md index f1d52da19365..cdb82bc759aa 100644 --- a/processor/k8sattributesprocessor/README.md +++ b/processor/k8sattributesprocessor/README.md @@ -2,12 +2,14 @@ | Status | | | ------------- |-----------| -| Stability | [beta]: logs, metrics, traces | +| Stability | [development]: profiles | +| | [beta]: logs, metrics, traces | | Distributions | [contrib], [k8s] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fk8sattributes%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fk8sattributes) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fk8sattributes%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fk8sattributes) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@dmitryax](https://www.github.com/dmitryax), [@fatsheep9146](https://www.github.com/fatsheep9146), [@TylerHelmuth](https://www.github.com/TylerHelmuth) | | Emeritus | [@rmfitzpatrick](https://www.github.com/rmfitzpatrick) | +[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development [beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib [k8s]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-k8s diff --git a/processor/k8sattributesprocessor/e2e_test.go b/processor/k8sattributesprocessor/e2e_test.go index 8ca836a0ee8d..553737a894fa 100644 --- a/processor/k8sattributesprocessor/e2e_test.go +++ b/processor/k8sattributesprocessor/e2e_test.go @@ -21,7 +21,9 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/pipeline/pipelineprofiles" "go.opentelemetry.io/collector/receiver/otlpreceiver" + "go.opentelemetry.io/collector/receiver/receiverprofiles" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/multierr" @@ -80,7 +82,8 @@ func TestE2E_ClusterRBAC(t *testing.T) { metricsConsumer := new(consumertest.MetricsSink) tracesConsumer := new(consumertest.TracesSink) logsConsumer := new(consumertest.LogsSink) - shutdownSinks := startUpSinks(t, metricsConsumer, tracesConsumer, logsConsumer) + profilesConsumer := new(consumertest.ProfilesSink) + shutdownSinks := startUpSinks(t, metricsConsumer, tracesConsumer, logsConsumer, profilesConsumer) defer shutdownSinks() testID := uuid.NewString()[:8] @@ -89,7 +92,10 @@ func TestE2E_ClusterRBAC(t *testing.T) { ManifestsDir: filepath.Join(testDir, "telemetrygen"), TestID: testID, OtlpEndpoint: fmt.Sprintf("otelcol-%s.%s:4317", testID, testNs), - DataTypes: []string{"metrics", "logs", "traces"}, + // `telemetrygen` doesn't support profiles + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36127 + // TODO: add "profiles" to DataTypes once #36127 is resolved + DataTypes: []string{"metrics", "logs", "traces"}, } telemetryGenObjs, telemetryGenObjInfos := k8stest.CreateTelemetryGenObjects(t, k8sClient, createTeleOpts) defer func() { @@ -102,8 +108,8 @@ func TestE2E_ClusterRBAC(t *testing.T) { k8stest.WaitForTelemetryGenToStart(t, k8sClient, info.Namespace, info.PodLabelSelectors, info.Workload, info.DataType) } - wantEntries := 128 // Minimal number of metrics/traces/logs to wait for. - waitForData(t, wantEntries, metricsConsumer, tracesConsumer, logsConsumer) + wantEntries := 128 // Minimal number of metrics/traces/logs/profiles to wait for. + waitForData(t, wantEntries, metricsConsumer, tracesConsumer, logsConsumer, profilesConsumer) tcs := []struct { name string @@ -414,6 +420,107 @@ func TestE2E_ClusterRBAC(t *testing.T) { "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), }, }, + { + name: "profiles-job", + dataType: pipelineprofiles.SignalProfiles, + service: "test-profiles-job", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-job-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.job.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-job"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "job"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-job"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "profiles-statefulset", + dataType: pipelineprofiles.SignalProfiles, + service: "test-profiles-statefulset", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-statefulset-0"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.statefulset.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-statefulset"), + "k8s.statefulset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "statefulset"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-statefulset"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "profiles-deployment", + dataType: pipelineprofiles.SignalProfiles, + service: "test-profiles-deployment", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-deployment-[a-z0-9]*-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-deployment"), + "k8s.deployment.uid": newExpectedValue(exist, ""), + "k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-deployment-[a-z0-9]*"), + "k8s.replicaset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "deployment"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-deployment"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "profiles-daemonset", + dataType: pipelineprofiles.SignalProfiles, + service: "test-profiles-daemonset", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-daemonset-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.daemonset.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-daemonset"), + "k8s.daemonset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "daemonset"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-daemonset"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, } for _, tc := range tcs { @@ -425,6 +532,8 @@ func TestE2E_ClusterRBAC(t *testing.T) { scanMetricsForAttributes(t, metricsConsumer, tc.service, tc.attrs) case pipeline.SignalLogs: scanLogsForAttributes(t, logsConsumer, tc.service, tc.attrs) + case pipelineprofiles.SignalProfiles: + scanProfilesForAttributes(t, profilesConsumer, tc.service, tc.attrs) default: t.Fatalf("unknown data type %s", tc.dataType) } @@ -453,7 +562,8 @@ func TestE2E_NamespacedRBAC(t *testing.T) { metricsConsumer := new(consumertest.MetricsSink) tracesConsumer := new(consumertest.TracesSink) logsConsumer := new(consumertest.LogsSink) - shutdownSinks := startUpSinks(t, metricsConsumer, tracesConsumer, logsConsumer) + profilesConsumer := new(consumertest.ProfilesSink) + shutdownSinks := startUpSinks(t, metricsConsumer, tracesConsumer, logsConsumer, profilesConsumer) defer shutdownSinks() testID := uuid.NewString()[:8] @@ -462,7 +572,10 @@ func TestE2E_NamespacedRBAC(t *testing.T) { ManifestsDir: filepath.Join(testDir, "telemetrygen"), TestID: testID, OtlpEndpoint: fmt.Sprintf("otelcol-%s.%s:4317", testID, nsName), - DataTypes: []string{"metrics", "logs", "traces"}, + // `telemetrygen` doesn't support profiles + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36127 + // TODO: add "profiles" to DataTypes once #36127 is resolved + DataTypes: []string{"metrics", "logs", "traces"}, } telemetryGenObjs, telemetryGenObjInfos := k8stest.CreateTelemetryGenObjects(t, k8sClient, createTeleOpts) defer func() { @@ -475,8 +588,8 @@ func TestE2E_NamespacedRBAC(t *testing.T) { k8stest.WaitForTelemetryGenToStart(t, k8sClient, info.Namespace, info.PodLabelSelectors, info.Workload, info.DataType) } - wantEntries := 20 // Minimal number of metrics/traces/logs to wait for. - waitForData(t, wantEntries, metricsConsumer, tracesConsumer, logsConsumer) + wantEntries := 20 // Minimal number of metrics/traces/logs/profiles to wait for. + waitForData(t, wantEntries, metricsConsumer, tracesConsumer, logsConsumer, profilesConsumer) tcs := []struct { name string @@ -556,6 +669,30 @@ func TestE2E_NamespacedRBAC(t *testing.T) { "container.id": newExpectedValue(exist, ""), }, }, + { + name: "profiles-deployment", + dataType: pipelineprofiles.SignalProfiles, + service: "test-profiles-deployment", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-deployment-[a-z0-9]*-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, startTimeRe), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, nsName), + "k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-deployment"), + "k8s.deployment.uid": newExpectedValue(regex, uidRe), + "k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-deployment-[a-z0-9]*"), + "k8s.replicaset.uid": newExpectedValue(regex, uidRe), + "k8s.annotations.workload": newExpectedValue(equal, "deployment"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-deployment"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + }, + }, } for _, tc := range tcs { @@ -567,6 +704,8 @@ func TestE2E_NamespacedRBAC(t *testing.T) { scanMetricsForAttributes(t, metricsConsumer, tc.service, tc.attrs) case pipeline.SignalLogs: scanLogsForAttributes(t, logsConsumer, tc.service, tc.attrs) + case pipelineprofiles.SignalProfiles: + scanProfilesForAttributes(t, profilesConsumer, tc.service, tc.attrs) default: t.Fatalf("unknown data type %s", tc.dataType) } @@ -586,7 +725,8 @@ func TestE2E_MixRBAC(t *testing.T) { metricsConsumer := new(consumertest.MetricsSink) tracesConsumer := new(consumertest.TracesSink) logsConsumer := new(consumertest.LogsSink) - shutdownSinks := startUpSinks(t, metricsConsumer, tracesConsumer, logsConsumer) + profilesConsumer := new(consumertest.ProfilesSink) + shutdownSinks := startUpSinks(t, metricsConsumer, tracesConsumer, logsConsumer, profilesConsumer) defer shutdownSinks() var workloadNs, otelNs string @@ -619,7 +759,10 @@ func TestE2E_MixRBAC(t *testing.T) { ManifestsDir: filepath.Join(testDir, "telemetrygen"), TestID: testID, OtlpEndpoint: fmt.Sprintf("otelcol-%s.%s:4317", testID, otelNs), - DataTypes: []string{"metrics", "logs", "traces"}, + // `telemetrygen` doesn't support profiles + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36127 + // TODO: add "profiles" to DataTypes once #36127 is resolved + DataTypes: []string{"metrics", "logs", "traces"}, } telemetryGenObjs, telemetryGenObjInfos := k8stest.CreateTelemetryGenObjects(t, k8sClient, createTeleOpts) @@ -633,8 +776,8 @@ func TestE2E_MixRBAC(t *testing.T) { k8stest.WaitForTelemetryGenToStart(t, k8sClient, info.Namespace, info.PodLabelSelectors, info.Workload, info.DataType) } - wantEntries := 20 // Minimal number of metrics/traces/logs to wait for. - waitForData(t, wantEntries, metricsConsumer, tracesConsumer, logsConsumer) + wantEntries := 20 // Minimal number of metrics/traces/logs/profiles to wait for. + waitForData(t, wantEntries, metricsConsumer, tracesConsumer, logsConsumer, profilesConsumer) tcs := []struct { name string @@ -723,6 +866,33 @@ func TestE2E_MixRBAC(t *testing.T) { "k8s.cluster.uid": newExpectedValue(regex, uidRe), }, }, + { + name: "profiles-deployment", + dataType: pipelineprofiles.SignalProfiles, + service: "test-profiles-deployment", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-deployment-[a-z0-9]*-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, workloadNs), + "k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-deployment"), + "k8s.deployment.uid": newExpectedValue(regex, uidRe), + "k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-deployment-[a-z0-9]*"), + "k8s.replicaset.uid": newExpectedValue(regex, uidRe), + "k8s.annotations.workload": newExpectedValue(equal, "deployment"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-deployment"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + }, + }, } for _, tc := range tcs { @@ -734,6 +904,8 @@ func TestE2E_MixRBAC(t *testing.T) { scanMetricsForAttributes(t, metricsConsumer, tc.service, tc.attrs) case pipeline.SignalLogs: scanLogsForAttributes(t, logsConsumer, tc.service, tc.attrs) + case pipelineprofiles.SignalProfiles: + scanProfilesForAttributes(t, profilesConsumer, tc.service, tc.attrs) default: t.Fatalf("unknown data type %s", tc.dataType) } @@ -765,7 +937,8 @@ func TestE2E_NamespacedRBACNoPodIP(t *testing.T) { metricsConsumer := new(consumertest.MetricsSink) tracesConsumer := new(consumertest.TracesSink) logsConsumer := new(consumertest.LogsSink) - shutdownSinks := startUpSinks(t, metricsConsumer, tracesConsumer, logsConsumer) + profilesConsumer := new(consumertest.ProfilesSink) + shutdownSinks := startUpSinks(t, metricsConsumer, tracesConsumer, logsConsumer, profilesConsumer) defer shutdownSinks() testID := uuid.NewString()[:8] @@ -774,7 +947,10 @@ func TestE2E_NamespacedRBACNoPodIP(t *testing.T) { ManifestsDir: filepath.Join(testDir, "telemetrygen"), TestID: testID, OtlpEndpoint: fmt.Sprintf("otelcol-%s.%s:4317", testID, nsName), - DataTypes: []string{"metrics", "logs", "traces"}, + // `telemetrygen` doesn't support profiles + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36127 + // TODO: add "profiles" to DataTypes once #36127 is resolved + DataTypes: []string{"metrics", "logs", "traces"}, } telemetryGenObjs, telemetryGenObjInfos := k8stest.CreateTelemetryGenObjects(t, k8sClient, createTeleOpts) defer func() { @@ -787,8 +963,8 @@ func TestE2E_NamespacedRBACNoPodIP(t *testing.T) { k8stest.WaitForTelemetryGenToStart(t, k8sClient, info.Namespace, info.PodLabelSelectors, info.Workload, info.DataType) } - wantEntries := 20 // Minimal number of metrics/traces/logs to wait for. - waitForData(t, wantEntries, metricsConsumer, tracesConsumer, logsConsumer) + wantEntries := 20 // Minimal number of metrics/traces/logs/profiles to wait for. + waitForData(t, wantEntries, metricsConsumer, tracesConsumer, logsConsumer, profilesConsumer) tcs := []struct { name string @@ -868,6 +1044,30 @@ func TestE2E_NamespacedRBACNoPodIP(t *testing.T) { "container.id": newExpectedValue(exist, ""), }, }, + { + name: "profiles-deployment", + dataType: pipelineprofiles.SignalProfiles, + service: "test-profiles-deployment", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-deployment-[a-z0-9]*-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(shouldnotexist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, startTimeRe), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, nsName), + "k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-deployment"), + "k8s.deployment.uid": newExpectedValue(regex, uidRe), + "k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-deployment-[a-z0-9]*"), + "k8s.replicaset.uid": newExpectedValue(regex, uidRe), + "k8s.annotations.workload": newExpectedValue(equal, "deployment"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-deployment"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(shouldnotexist, ""), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + }, + }, } for _, tc := range tcs { @@ -879,6 +1079,8 @@ func TestE2E_NamespacedRBACNoPodIP(t *testing.T) { scanMetricsForAttributes(t, metricsConsumer, tc.service, tc.attrs) case pipeline.SignalLogs: scanLogsForAttributes(t, logsConsumer, tc.service, tc.attrs) + case pipelineprofiles.SignalProfiles: + scanProfilesForAttributes(t, profilesConsumer, tc.service, tc.attrs) default: t.Fatalf("unknown data type %s", tc.dataType) } @@ -949,6 +1151,33 @@ func scanLogsForAttributes(t *testing.T, ls *consumertest.LogsSink, expectedServ t.Fatalf("no logs found for service %s", expectedService) } +func scanProfilesForAttributes(t *testing.T, ps *consumertest.ProfilesSink, expectedService string, + kvs map[string]*expectedValue) { + + // `telemetrygen` doesn't support profiles + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36127 + // TODO: Remove `t.Skip()` once #36127 is resolved + t.Skip("Skip profiles test") + + // Iterate over the received set of profiles starting from the most recent entries due to a bug in the processor: + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18892 + // TODO: Remove the reverse loop once it's fixed. All the metrics should be properly annotated. + for i := len(ps.AllProfiles()) - 1; i >= 0; i-- { + profiles := ps.AllProfiles()[i] + for i := 0; i < profiles.ResourceProfiles().Len(); i++ { + resource := profiles.ResourceProfiles().At(i).Resource() + service, exist := resource.Attributes().Get("service.name") + assert.True(t, exist, "profile do not has 'service.name' attribute in resource") + if service.AsString() != expectedService { + continue + } + assert.NoError(t, resourceHasAttributes(resource, kvs)) + return + } + } + t.Fatalf("no profiles found for service %s", expectedService) +} + func resourceHasAttributes(resource pcommon.Resource, kvs map[string]*expectedValue) error { foundAttrs := make(map[string]bool) shouldNotFoundAttrs := make(map[string]bool) @@ -998,7 +1227,7 @@ func resourceHasAttributes(resource pcommon.Resource, kvs map[string]*expectedVa return err } -func startUpSinks(t *testing.T, mc *consumertest.MetricsSink, tc *consumertest.TracesSink, lc *consumertest.LogsSink) func() { +func startUpSinks(t *testing.T, mc *consumertest.MetricsSink, tc *consumertest.TracesSink, lc *consumertest.LogsSink, pc *consumertest.ProfilesSink) func() { f := otlpreceiver.NewFactory() cfg := f.CreateDefaultConfig().(*otlpreceiver.Config) cfg.HTTP = nil @@ -1008,19 +1237,24 @@ func startUpSinks(t *testing.T, mc *consumertest.MetricsSink, tc *consumertest.T require.NoError(t, err, "failed creating metrics receiver") _, err = f.CreateTraces(context.Background(), receivertest.NewNopSettings(), cfg, tc) require.NoError(t, err, "failed creating traces receiver") - rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, lc) + _, err = f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, lc) require.NoError(t, err, "failed creating logs receiver") + rcvr, err := f.(receiverprofiles.Factory).CreateProfiles(context.Background(), receivertest.NewNopSettings(), cfg, pc) + require.NoError(t, err, "failed creating profiles receiver") require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) return func() { assert.NoError(t, rcvr.Shutdown(context.Background())) } } -func waitForData(t *testing.T, entriesNum int, mc *consumertest.MetricsSink, tc *consumertest.TracesSink, lc *consumertest.LogsSink) { +func waitForData(t *testing.T, entriesNum int, mc *consumertest.MetricsSink, tc *consumertest.TracesSink, lc *consumertest.LogsSink, pc *consumertest.ProfilesSink) { timeoutMinutes := 3 require.Eventuallyf(t, func() bool { + // `telemetrygen` doesn't support profiles + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36127 + // TODO: assert `len(pc.AllProfiles()) > entriesNum` once #36127 is resolved return len(mc.AllMetrics()) > entriesNum && len(tc.AllTraces()) > entriesNum && len(lc.AllLogs()) > entriesNum }, time.Duration(timeoutMinutes)*time.Minute, 1*time.Second, - "failed to receive %d entries, received %d metrics, %d traces, %d logs in %d minutes", entriesNum, - len(mc.AllMetrics()), len(tc.AllTraces()), len(lc.AllLogs()), timeoutMinutes) + "failed to receive %d entries, received %d metrics, %d traces, %d logs, %d profiles in %d minutes", entriesNum, + len(mc.AllMetrics()), len(tc.AllTraces()), len(lc.AllLogs()), len(pc.AllProfiles()), timeoutMinutes) } diff --git a/processor/k8sattributesprocessor/factory.go b/processor/k8sattributesprocessor/factory.go index fe1a46c24c83..f2f05e8bf4ea 100644 --- a/processor/k8sattributesprocessor/factory.go +++ b/processor/k8sattributesprocessor/factory.go @@ -8,8 +8,11 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerprofiles" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" + "go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles" + "go.opentelemetry.io/collector/processor/processorprofiles" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" @@ -22,12 +25,13 @@ var defaultExcludes = ExcludeConfig{Pods: []ExcludePodConfig{{Name: "jaeger-agen // NewFactory returns a new factory for the k8s processor. func NewFactory() processor.Factory { - return processor.NewFactory( + return processorprofiles.NewFactory( metadata.Type, createDefaultConfig, - processor.WithTraces(createTracesProcessor, metadata.TracesStability), - processor.WithMetrics(createMetricsProcessor, metadata.MetricsStability), - processor.WithLogs(createLogsProcessor, metadata.LogsStability), + processorprofiles.WithTraces(createTracesProcessor, metadata.TracesStability), + processorprofiles.WithMetrics(createMetricsProcessor, metadata.MetricsStability), + processorprofiles.WithLogs(createLogsProcessor, metadata.LogsStability), + processorprofiles.WithProfiles(createProfilesProcessor, metadata.ProfilesStability), ) } @@ -68,6 +72,15 @@ func createMetricsProcessor( return createMetricsProcessorWithOptions(ctx, params, cfg, nextMetricsConsumer) } +func createProfilesProcessor( + ctx context.Context, + params processor.Settings, + cfg component.Config, + nextProfilesConsumer consumerprofiles.Profiles, +) (processorprofiles.Profiles, error) { + return createProfilesProcessorWithOptions(ctx, params, cfg, nextProfilesConsumer) +} + func createTracesProcessorWithOptions( ctx context.Context, set processor.Settings, @@ -128,6 +141,27 @@ func createLogsProcessorWithOptions( processorhelper.WithShutdown(kp.Shutdown)) } +func createProfilesProcessorWithOptions( + ctx context.Context, + set processor.Settings, + cfg component.Config, + nextProfilesConsumer consumerprofiles.Profiles, + options ...option, +) (processorprofiles.Profiles, error) { + kp := createKubernetesProcessor(set, cfg, options...) + + return processorhelperprofiles.NewProfiles( + ctx, + set, + cfg, + nextProfilesConsumer, + kp.processProfiles, + processorhelperprofiles.WithCapabilities(consumerCapabilities), + processorhelperprofiles.WithStart(kp.Start), + processorhelperprofiles.WithShutdown(kp.Shutdown), + ) +} + func createKubernetesProcessor( params processor.Settings, cfg component.Config, diff --git a/processor/k8sattributesprocessor/factory_test.go b/processor/k8sattributesprocessor/factory_test.go index 171b06161856..751d4d170cdb 100644 --- a/processor/k8sattributesprocessor/factory_test.go +++ b/processor/k8sattributesprocessor/factory_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/processor/processorprofiles" "go.opentelemetry.io/collector/processor/processortest" ) @@ -41,6 +42,10 @@ func TestCreateProcessor(t *testing.T) { assert.NotNil(t, lp) assert.NoError(t, err) + pp, err := factory.(processorprofiles.Factory).CreateProfiles(context.Background(), params, cfg, consumertest.NewNop()) + assert.NotNil(t, pp) + assert.NoError(t, err) + oCfg := cfg.(*Config) oCfg.Passthrough = true @@ -56,6 +61,10 @@ func TestCreateProcessor(t *testing.T) { assert.NotNil(t, lp) assert.NoError(t, err) + pp, err = factory.(processorprofiles.Factory).CreateProfiles(context.Background(), params, cfg, consumertest.NewNop()) + assert.NotNil(t, pp) + assert.NoError(t, err) + // Switch it back so other tests run afterwards will not fail on unexpected state kubeClientProvider = realClient } diff --git a/processor/k8sattributesprocessor/go.mod b/processor/k8sattributesprocessor/go.mod index 1cdae9632946..6c2fc0e290af 100644 --- a/processor/k8sattributesprocessor/go.mod +++ b/processor/k8sattributesprocessor/go.mod @@ -16,13 +16,19 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.113.0 go.opentelemetry.io/collector/confmap v1.19.0 go.opentelemetry.io/collector/consumer v0.113.0 + go.opentelemetry.io/collector/consumer/consumerprofiles v0.113.0 go.opentelemetry.io/collector/consumer/consumertest v0.113.0 go.opentelemetry.io/collector/featuregate v1.19.0 go.opentelemetry.io/collector/pdata v1.19.0 + go.opentelemetry.io/collector/pdata/pprofile v0.113.0 go.opentelemetry.io/collector/pipeline v0.113.0 + go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.113.0 go.opentelemetry.io/collector/processor v0.113.0 + go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles v0.113.0 + go.opentelemetry.io/collector/processor/processorprofiles v0.113.0 go.opentelemetry.io/collector/processor/processortest v0.113.0 go.opentelemetry.io/collector/receiver/otlpreceiver v0.113.0 + go.opentelemetry.io/collector/receiver/receiverprofiles v0.113.0 go.opentelemetry.io/collector/receiver/receivertest v0.113.0 go.opentelemetry.io/collector/semconv v0.113.0 go.opentelemetry.io/otel/metric v1.32.0 @@ -94,15 +100,11 @@ require ( go.opentelemetry.io/collector/config/configtls v1.19.0 // indirect go.opentelemetry.io/collector/config/internal v0.113.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.113.0 // indirect - go.opentelemetry.io/collector/consumer/consumerprofiles v0.113.0 // indirect go.opentelemetry.io/collector/extension v0.113.0 // indirect go.opentelemetry.io/collector/extension/auth v0.113.0 // indirect go.opentelemetry.io/collector/internal/sharedcomponent v0.113.0 // indirect - go.opentelemetry.io/collector/pdata/pprofile v0.113.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.113.0 // indirect - go.opentelemetry.io/collector/processor/processorprofiles v0.113.0 // indirect go.opentelemetry.io/collector/receiver v0.113.0 // indirect - go.opentelemetry.io/collector/receiver/receiverprofiles v0.113.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect go.opentelemetry.io/otel v1.32.0 // indirect diff --git a/processor/k8sattributesprocessor/go.sum b/processor/k8sattributesprocessor/go.sum index 4e29149eee5b..91c3cc8d645c 100644 --- a/processor/k8sattributesprocessor/go.sum +++ b/processor/k8sattributesprocessor/go.sum @@ -1279,8 +1279,12 @@ go.opentelemetry.io/collector/pdata/testdata v0.113.0 h1:vRfn85jicO2F4eOTgsWtzmU go.opentelemetry.io/collector/pdata/testdata v0.113.0/go.mod h1:sR+6eR+YEJhYZu9StbqzeWcCmHpfBAgX/qjP82HY9Gw= go.opentelemetry.io/collector/pipeline v0.113.0 h1:vSRzRe3717jV0btCNPhVkhg2lu0uFxcm2VO+vhad/eE= go.opentelemetry.io/collector/pipeline v0.113.0/go.mod h1:4vOvjVsoYTHVGTbfFwqfnQOSV2K3RKUHofh3jNRc2Mg= +go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.113.0 h1:PwQnErsLvEd1x6VIyjLmKQot9huKWqIfEz1kd+8aj4k= +go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.113.0/go.mod h1:tChJYsCG3wc6JPT9aJO3y+32V14NhmCFZOh3k5ORGdQ= go.opentelemetry.io/collector/processor v0.113.0 h1:BQI6MsKtiCG9HT/nmiRXTKP6SZFrjFKVfM6pTQfbc0k= go.opentelemetry.io/collector/processor v0.113.0/go.mod h1:oX91zMI8ZkoaYSUfUYflHiMiiBJPKtODNBUCrETLLd8= +go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles v0.113.0 h1:bZ1i5l6/4nj7PsLqeHw7Opw5vdrpUsDvuH6a6kx+2yg= +go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles v0.113.0/go.mod h1:Uxv+5NNIJJCuz52DPFa9INjrpZSfidoTkv849tNp1qI= go.opentelemetry.io/collector/processor/processorprofiles v0.113.0 h1:cczN6whdrCWww3T0FBV3U7lsVKQmkWDX05M+9lANHgk= go.opentelemetry.io/collector/processor/processorprofiles v0.113.0/go.mod h1:4Dmx5qsvujgJ+MC+KqWI7UDVM2liXa3sH/9XnGiL9aE= go.opentelemetry.io/collector/processor/processortest v0.113.0 h1:jGoDJ+tDCzuDcAWZeshQtnK/DQAvMKd4wZAIDgAM5aA= diff --git a/processor/k8sattributesprocessor/internal/metadata/generated_status.go b/processor/k8sattributesprocessor/internal/metadata/generated_status.go index e656980a3605..e93559703fe5 100644 --- a/processor/k8sattributesprocessor/internal/metadata/generated_status.go +++ b/processor/k8sattributesprocessor/internal/metadata/generated_status.go @@ -12,7 +12,8 @@ var ( ) const ( - LogsStability = component.StabilityLevelBeta - MetricsStability = component.StabilityLevelBeta - TracesStability = component.StabilityLevelBeta + ProfilesStability = component.StabilityLevelDevelopment + LogsStability = component.StabilityLevelBeta + MetricsStability = component.StabilityLevelBeta + TracesStability = component.StabilityLevelBeta ) diff --git a/processor/k8sattributesprocessor/metadata.yaml b/processor/k8sattributesprocessor/metadata.yaml index dd129a014a6f..a388cfcb3dfd 100644 --- a/processor/k8sattributesprocessor/metadata.yaml +++ b/processor/k8sattributesprocessor/metadata.yaml @@ -4,6 +4,7 @@ status: class: processor stability: beta: [logs, metrics, traces] + development: [profiles] distributions: [contrib, k8s] codeowners: active: [dmitryax, fatsheep9146, TylerHelmuth] diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index 40347a55f0ef..5d63cbbd100a 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.8.0" "go.uber.org/zap" @@ -117,6 +118,16 @@ func (kp *kubernetesprocessor) processLogs(ctx context.Context, ld plog.Logs) (p return ld, nil } +// processProfiles process profiles and add k8s metadata using resource IP, hostname or incoming IP as pod origin. +func (kp *kubernetesprocessor) processProfiles(ctx context.Context, pd pprofile.Profiles) (pprofile.Profiles, error) { + rp := pd.ResourceProfiles() + for i := 0; i < rp.Len(); i++ { + kp.processResource(ctx, rp.At(i).Resource()) + } + + return pd, nil +} + // processResource adds Pod metadata tags to resource based on pod association configuration func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pcommon.Resource) { podIdentifierValue := extractPodID(ctx, resource.Attributes(), kp.podAssociations) diff --git a/processor/k8sattributesprocessor/processor_test.go b/processor/k8sattributesprocessor/processor_test.go index f7493cc38cdc..b8cbf1ec66b0 100644 --- a/processor/k8sattributesprocessor/processor_test.go +++ b/processor/k8sattributesprocessor/processor_test.go @@ -17,12 +17,15 @@ import ( "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerprofiles" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorprofiles" "go.opentelemetry.io/collector/processor/processortest" conventions "go.opentelemetry.io/collector/semconv/v1.8.0" @@ -81,6 +84,19 @@ func newLogsProcessor(cfg component.Config, nextLogsConsumer consumer.Logs, opti ) } +func newProfilesProcessor(cfg component.Config, nextProfilesConsumer consumerprofiles.Profiles, options ...option) (processorprofiles.Profiles, error) { + opts := options + opts = append(opts, withKubeClientProvider(newFakeClient)) + set := processortest.NewNopSettings() + return createProfilesProcessorWithOptions( + context.Background(), + set, + cfg, + nextProfilesConsumer, + opts..., + ) +} + // withKubeClientProvider sets the specific implementation for getting K8s Client instances func withKubeClientProvider(kcp kube.ClientProvider) option { return func(p *kubernetesprocessor) error { @@ -102,14 +118,17 @@ type multiTest struct { tp processor.Traces mp processor.Metrics lp processor.Logs + pp processorprofiles.Profiles - nextTrace *consumertest.TracesSink - nextMetrics *consumertest.MetricsSink - nextLogs *consumertest.LogsSink + nextTrace *consumertest.TracesSink + nextMetrics *consumertest.MetricsSink + nextLogs *consumertest.LogsSink + nextProfiles *consumertest.ProfilesSink - kpMetrics *kubernetesprocessor - kpTrace *kubernetesprocessor - kpLogs *kubernetesprocessor + kpMetrics *kubernetesprocessor + kpTrace *kubernetesprocessor + kpLogs *kubernetesprocessor + kpProfiles *kubernetesprocessor } func newMultiTest( @@ -119,10 +138,11 @@ func newMultiTest( options ...option, ) *multiTest { m := &multiTest{ - t: t, - nextTrace: new(consumertest.TracesSink), - nextMetrics: new(consumertest.MetricsSink), - nextLogs: new(consumertest.LogsSink), + t: t, + nextTrace: new(consumertest.TracesSink), + nextMetrics: new(consumertest.MetricsSink), + nextLogs: new(consumertest.LogsSink), + nextProfiles: new(consumertest.ProfilesSink), } tp, err := newTracesProcessor(cfg, m.nextTrace, append(options, withExtractKubernetesProcessorInto(&m.kpTrace))...) @@ -161,9 +181,22 @@ func newMultiTest( require.NoError(t, err) } + pp, err := newProfilesProcessor(cfg, m.nextProfiles, append(options, withExtractKubernetesProcessorInto(&m.kpProfiles))...) + require.NoError(t, err) + err = pp.Start(context.Background(), &nopHost{ + reportFunc: func(event *componentstatus.Event) { + errFunc(event.Err()) + }, + }) + if errFunc == nil { + assert.NotNil(t, pp) + require.NoError(t, err) + } + m.tp = tp m.mp = mp m.lp = lp + m.pp = pp return m } @@ -172,12 +205,14 @@ func (m *multiTest) testConsume( traces ptrace.Traces, metrics pmetric.Metrics, logs plog.Logs, + profiles pprofile.Profiles, errFunc func(err error), ) { errs := []error{ m.tp.ConsumeTraces(ctx, traces), m.mp.ConsumeMetrics(ctx, metrics), m.lp.ConsumeLogs(ctx, logs), + m.pp.ConsumeProfiles(ctx, profiles), } for _, err := range errs { @@ -191,24 +226,28 @@ func (m *multiTest) kubernetesProcessorOperation(kpOp func(kp *kubernetesprocess kpOp(m.kpTrace) kpOp(m.kpMetrics) kpOp(m.kpLogs) + kpOp(m.kpProfiles) } func (m *multiTest) assertBatchesLen(batchesLen int) { require.Len(m.t, m.nextTrace.AllTraces(), batchesLen) require.Len(m.t, m.nextMetrics.AllMetrics(), batchesLen) require.Len(m.t, m.nextLogs.AllLogs(), batchesLen) + require.Len(m.t, m.nextProfiles.AllProfiles(), batchesLen) } func (m *multiTest) assertResourceObjectLen(batchNo int) { assert.Equal(m.t, 1, m.nextTrace.AllTraces()[batchNo].ResourceSpans().Len()) assert.Equal(m.t, 1, m.nextMetrics.AllMetrics()[batchNo].ResourceMetrics().Len()) assert.Equal(m.t, 1, m.nextLogs.AllLogs()[batchNo].ResourceLogs().Len()) + assert.Equal(m.t, 1, m.nextProfiles.AllProfiles()[batchNo].ResourceProfiles().Len()) } func (m *multiTest) assertResourceAttributesLen(batchNo int, attrsLen int) { assert.Equal(m.t, attrsLen, m.nextTrace.AllTraces()[batchNo].ResourceSpans().At(0).Resource().Attributes().Len()) assert.Equal(m.t, attrsLen, m.nextMetrics.AllMetrics()[batchNo].ResourceMetrics().At(0).Resource().Attributes().Len()) assert.Equal(m.t, attrsLen, m.nextLogs.AllLogs()[batchNo].ResourceLogs().At(0).Resource().Attributes().Len()) + assert.Equal(m.t, attrsLen, m.nextProfiles.AllProfiles()[batchNo].ResourceProfiles().At(0).Resource().Attributes().Len()) } func (m *multiTest) assertResource(batchNum int, resourceFunc func(res pcommon.Resource)) { @@ -274,6 +313,17 @@ func generateLogs(resourceFunc ...generateResourceFunc) plog.Logs { return l } +func generateProfiles(resourceFunc ...generateResourceFunc) pprofile.Profiles { + p := pprofile.NewProfiles() + ps := p.ResourceProfiles().AppendEmpty() + for _, resFun := range resourceFunc { + res := ps.Resource() + resFun(res) + } + ps.ScopeProfiles().AppendEmpty().Profiles().AppendEmpty() + return p +} + func withPassthroughIP(passthroughIP string) generateResourceFunc { return func(res pcommon.Resource) { res.Attributes().PutStr(kube.K8sIPLabelName, passthroughIP) @@ -345,6 +395,7 @@ func TestIPDetectionFromContext(t *testing.T) { generateTraces(), generateMetrics(), generateLogs(), + generateProfiles(), func(err error) { assert.NoError(t, err) }) @@ -365,6 +416,7 @@ func TestNilBatch(t *testing.T) { ptrace.NewTraces(), pmetric.NewMetrics(), generateLogs(), + generateProfiles(), func(err error) { assert.NoError(t, err) }) @@ -399,6 +451,7 @@ func TestProcessorNoAttrs(t *testing.T) { generateTraces(), generateMetrics(), generateLogs(), + generateProfiles(), func(err error) { assert.NoError(t, err) }) @@ -428,6 +481,7 @@ func TestProcessorNoAttrs(t *testing.T) { generateTraces(), generateMetrics(), generateLogs(), + generateProfiles(), func(err error) { assert.NoError(t, err) }) @@ -445,6 +499,7 @@ func TestProcessorNoAttrs(t *testing.T) { generateTraces(), generateMetrics(), generateLogs(), + generateProfiles(), func(err error) { assert.NoError(t, err) }) @@ -461,7 +516,7 @@ func TestNoIP(t *testing.T) { nil, ) - m.testConsume(context.Background(), generateTraces(), generateMetrics(), generateLogs(), nil) + m.testConsume(context.Background(), generateTraces(), generateMetrics(), generateLogs(), generateProfiles(), nil) m.assertBatchesLen(1) m.assertResourceObjectLen(0) @@ -517,6 +572,7 @@ func TestIPSourceWithoutPodAssociation(t *testing.T) { traces := generateTraces() metrics := generateMetrics() logs := generateLogs() + profiles := generateProfiles() resources := []pcommon.Resource{ traces.ResourceSpans().At(0).Resource(), @@ -532,7 +588,7 @@ func TestIPSourceWithoutPodAssociation(t *testing.T) { } } - m.testConsume(ctx, traces, metrics, logs, nil) + m.testConsume(ctx, traces, metrics, logs, profiles, nil) m.assertBatchesLen(i + 1) m.assertResource(i, func(res pcommon.Resource) { require.Positive(t, res.Attributes().Len()) @@ -607,18 +663,20 @@ func TestIPSourceWithPodAssociation(t *testing.T) { traces := generateTraces() metrics := generateMetrics() logs := generateLogs() + profiles := generateProfiles() resources := []pcommon.Resource{ traces.ResourceSpans().At(0).Resource(), metrics.ResourceMetrics().At(0).Resource(), logs.ResourceLogs().At(0).Resource(), + profiles.ResourceProfiles().At(0).Resource(), } for _, res := range resources { res.Attributes().PutStr(tc.labelName, tc.labelValue) } - m.testConsume(ctx, traces, metrics, logs, nil) + m.testConsume(ctx, traces, metrics, logs, profiles, nil) m.assertBatchesLen(i + 1) m.assertResource(i, func(res pcommon.Resource) { require.Positive(t, res.Attributes().Len()) @@ -659,6 +717,7 @@ func TestPodUID(t *testing.T) { generateTraces(withPodUID("ef10d10b-2da5-4030-812e-5f45c1531227")), generateMetrics(withPodUID("ef10d10b-2da5-4030-812e-5f45c1531227")), generateLogs(withPodUID("ef10d10b-2da5-4030-812e-5f45c1531227")), + generateProfiles(withPodUID("ef10d10b-2da5-4030-812e-5f45c1531227")), nil) m.assertBatchesLen(1) @@ -719,6 +778,7 @@ func TestAddPodLabels(t *testing.T) { generateTraces(), generateMetrics(), generateLogs(), + generateProfiles(), func(err error) { assert.NoError(t, err) }) @@ -796,6 +856,7 @@ func TestAddNamespaceLabels(t *testing.T) { generateTraces(), generateMetrics(), generateLogs(), + generateProfiles(), func(err error) { assert.NoError(t, err) }) @@ -868,6 +929,7 @@ func TestAddNodeLabels(t *testing.T) { generateTraces(), generateMetrics(), generateLogs(), + generateProfiles(), func(err error) { assert.NoError(t, err) }) @@ -933,6 +995,7 @@ func TestAddNodeUID(t *testing.T) { generateTraces(), generateMetrics(), generateLogs(), + generateProfiles(), func(err error) { assert.NoError(t, err) }) @@ -1215,6 +1278,7 @@ func TestProcessorAddContainerAttributes(t *testing.T) { generateTraces(tt.resourceGens...), generateMetrics(tt.resourceGens...), generateLogs(tt.resourceGens...), + generateProfiles(tt.resourceGens...), nil, ) @@ -1266,6 +1330,7 @@ func TestProcessorPicksUpPassthoughPodIp(t *testing.T) { generateTraces(withPassthroughIP("2.2.2.2")), generateMetrics(withPassthroughIP("2.2.2.2")), generateLogs(withPassthroughIP("2.2.2.2")), + generateProfiles(withPassthroughIP("2.2.2.2")), func(err error) { assert.NoError(t, err) }) diff --git a/processor/k8sattributesprocessor/testdata/e2e/clusterrbac/collector/deployment.yaml b/processor/k8sattributesprocessor/testdata/e2e/clusterrbac/collector/deployment.yaml index a2011605a3e0..70dc41d4beb4 100644 --- a/processor/k8sattributesprocessor/testdata/e2e/clusterrbac/collector/deployment.yaml +++ b/processor/k8sattributesprocessor/testdata/e2e/clusterrbac/collector/deployment.yaml @@ -21,6 +21,7 @@ spec: command: - /otelcontribcol - --config=/conf/relay.yaml + - --feature-gates=service.profilesSupport image: "otelcontribcol:latest" imagePullPolicy: Never ports: diff --git a/processor/k8sattributesprocessor/testdata/e2e/mixrbac/collector/deployment.yaml b/processor/k8sattributesprocessor/testdata/e2e/mixrbac/collector/deployment.yaml index 14860e9513fd..36f669cf952b 100644 --- a/processor/k8sattributesprocessor/testdata/e2e/mixrbac/collector/deployment.yaml +++ b/processor/k8sattributesprocessor/testdata/e2e/mixrbac/collector/deployment.yaml @@ -21,6 +21,7 @@ spec: command: - /otelcontribcol - --config=/conf/relay.yaml + - --feature-gates=service.profilesSupport image: "otelcontribcol:latest" imagePullPolicy: Never ports: diff --git a/processor/k8sattributesprocessor/testdata/e2e/namespaced_rbac_no_pod_ip/collector/deployment.yaml b/processor/k8sattributesprocessor/testdata/e2e/namespaced_rbac_no_pod_ip/collector/deployment.yaml index bc1ee38dfd38..5b0d166325c8 100644 --- a/processor/k8sattributesprocessor/testdata/e2e/namespaced_rbac_no_pod_ip/collector/deployment.yaml +++ b/processor/k8sattributesprocessor/testdata/e2e/namespaced_rbac_no_pod_ip/collector/deployment.yaml @@ -21,6 +21,7 @@ spec: command: - /otelcontribcol - --config=/conf/relay.yaml + - --feature-gates=service.profilesSupport image: "otelcontribcol:latest" imagePullPolicy: Never ports: diff --git a/processor/k8sattributesprocessor/testdata/e2e/namespacedrbac/collector/deployment.yaml b/processor/k8sattributesprocessor/testdata/e2e/namespacedrbac/collector/deployment.yaml index b7ae9cb34342..a0f6ca56d1ff 100644 --- a/processor/k8sattributesprocessor/testdata/e2e/namespacedrbac/collector/deployment.yaml +++ b/processor/k8sattributesprocessor/testdata/e2e/namespacedrbac/collector/deployment.yaml @@ -21,6 +21,7 @@ spec: command: - /otelcontribcol - --config=/conf/relay.yaml + - --feature-gates=service.profilesSupport image: "otelcontribcol:latest" imagePullPolicy: Never ports: