From cc19e6bc65c42fbbbd94dca992803391602ac078 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Thu, 9 Jan 2025 21:21:01 +0100 Subject: [PATCH] [processor/k8sattributes] update internally stored pods with replicaset/deployment information (#37088) #### Description This PR adapts the k8sattributes processor to update its internally stored pods with the information about their respective replica sets and deployments, after the replica set information is received via the informer. This can happen during the initial sync of the processor, where it can happen that the replica set information for existing pods is received after the pods have already been received and stored internally. #### Link to tracking issue Fixes #37056 #### Testing Added unit test #### Documentation --------- Signed-off-by: Florian Bacher --- .chloggen/k8sattributes-deployment-name.yaml | 27 + processor/k8sattributesprocessor/e2e_test.go | 497 ++++++++++++++++++ .../internal/kube/client.go | 29 +- 3 files changed, 540 insertions(+), 13 deletions(-) create mode 100644 .chloggen/k8sattributes-deployment-name.yaml diff --git a/.chloggen/k8sattributes-deployment-name.yaml b/.chloggen/k8sattributes-deployment-name.yaml new file mode 100644 index 000000000000..7492b491faf1 --- /dev/null +++ b/.chloggen/k8sattributes-deployment-name.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sattributesprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Ensure the pods gathered by the processor contain the information about their related replica sets and deployments after the initial sync + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37056] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/k8sattributesprocessor/e2e_test.go b/processor/k8sattributesprocessor/e2e_test.go index 47d57b2c6a29..7de25e62e3b7 100644 --- a/processor/k8sattributesprocessor/e2e_test.go +++ b/processor/k8sattributesprocessor/e2e_test.go @@ -1087,6 +1087,503 @@ func TestE2E_NamespacedRBACNoPodIP(t *testing.T) { } } +// TestE2E_ClusterRBACCollectorRestart tests the k8s attributes processor in a k8s cluster with the collector's service account having +// cluster-wide permissions to list/watch namespaces, nodes, pods and replicasets. The config in the test does not +// set filter::namespace, and the telemetrygen image has a latest tag but no digest. +// The test starts the collector after the telemetrygen objects, to verify the initial sync of the k8sattributesproccessor +// captures all available resources in the cluster in a way that ensures the relations between them (e.g. pod <-> replicaset <-> deployment) are not lost. +// The test requires a prebuilt otelcontribcol image uploaded to a kind k8s cluster defined in +// `/tmp/kube-config-otelcol-e2e-testing`. Run the following command prior to running the test locally: +// +// kind create cluster --kubeconfig=/tmp/kube-config-otelcol-e2e-testing +// make docker-otelcontribcol +// KUBECONFIG=/tmp/kube-config-otelcol-e2e-testing kind load docker-image otelcontribcol:latest +func TestE2E_ClusterRBACCollectorStartAfterTelemetryGen(t *testing.T) { + testDir := filepath.Join("testdata", "e2e", "clusterrbac") + + k8sClient, err := k8stest.NewK8sClient(testKubeConfig) + require.NoError(t, err) + + nsFile := filepath.Join(testDir, "namespace.yaml") + buf, err := os.ReadFile(nsFile) + require.NoErrorf(t, err, "failed to read namespace object file %s", nsFile) + nsObj, err := k8stest.CreateObject(k8sClient, buf) + require.NoErrorf(t, err, "failed to create k8s namespace from file %s", nsFile) + + testNs := nsObj.GetName() + defer func() { + require.NoErrorf(t, k8stest.DeleteObject(k8sClient, nsObj), "failed to delete namespace %s", testNs) + }() + + metricsConsumer := new(consumertest.MetricsSink) + tracesConsumer := new(consumertest.TracesSink) + logsConsumer := new(consumertest.LogsSink) + profilesConsumer := new(consumertest.ProfilesSink) + shutdownSinks := startUpSinks(t, metricsConsumer, tracesConsumer, logsConsumer, profilesConsumer) + defer shutdownSinks() + + testID := uuid.NewString()[:8] + createTeleOpts := &k8stest.TelemetrygenCreateOpts{ + ManifestsDir: filepath.Join(testDir, "telemetrygen"), + TestID: testID, + OtlpEndpoint: fmt.Sprintf("otelcol-%s.%s:4317", testID, testNs), + // `telemetrygen` doesn't support profiles + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36127 + // TODO: add "profiles" to DataTypes once #36127 is resolved + DataTypes: []string{"metrics", "logs", "traces"}, + } + telemetryGenObjs, telemetryGenObjInfos := k8stest.CreateTelemetryGenObjects(t, k8sClient, createTeleOpts) + defer func() { + for _, obj := range telemetryGenObjs { + require.NoErrorf(t, k8stest.DeleteObject(k8sClient, obj), "failed to delete object %s", obj.GetName()) + } + }() + + for _, info := range telemetryGenObjInfos { + k8stest.WaitForTelemetryGenToStart(t, k8sClient, info.Namespace, info.PodLabelSelectors, info.Workload, info.DataType) + } + + // start the collector after the telemetry gen objects + collectorObjs := k8stest.CreateCollectorObjects(t, k8sClient, testID, filepath.Join(testDir, "collector")) + defer func() { + for _, obj := range collectorObjs { + require.NoErrorf(t, k8stest.DeleteObject(k8sClient, obj), "failed to delete object %s", obj.GetName()) + } + }() + + wantEntries := 128 // Minimal number of metrics/traces/logs/profiles to wait for. + waitForData(t, wantEntries, metricsConsumer, tracesConsumer, logsConsumer, profilesConsumer) + + tcs := []struct { + name string + dataType pipeline.Signal + service string + attrs map[string]*expectedValue + }{ + { + name: "traces-job", + dataType: pipeline.SignalTraces, + service: "test-traces-job", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-traces-job-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.job.name": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-job"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "job"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-job"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "traces-statefulset", + dataType: pipeline.SignalTraces, + service: "test-traces-statefulset", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-statefulset-0"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.statefulset.name": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-statefulset"), + "k8s.statefulset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "statefulset"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-statefulset"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "traces-deployment", + dataType: pipeline.SignalTraces, + service: "test-traces-deployment", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-traces-deployment-[a-z0-9]*-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-deployment"), + "k8s.deployment.uid": newExpectedValue(exist, ""), + "k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-traces-deployment-[a-z0-9]*"), + "k8s.replicaset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "deployment"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-deployment"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "traces-daemonset", + dataType: pipeline.SignalTraces, + service: "test-traces-daemonset", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-traces-daemonset-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.daemonset.name": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-daemonset"), + "k8s.daemonset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "daemonset"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-daemonset"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "metrics-job", + dataType: pipeline.SignalMetrics, + service: "test-metrics-job", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-metrics-job-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.job.name": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-job"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "job"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-job"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "metrics-statefulset", + dataType: pipeline.SignalMetrics, + service: "test-metrics-statefulset", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-statefulset-0"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.statefulset.name": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-statefulset"), + "k8s.statefulset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "statefulset"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-statefulset"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "metrics-deployment", + dataType: pipeline.SignalMetrics, + service: "test-metrics-deployment", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-metrics-deployment-[a-z0-9]*-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-deployment"), + "k8s.deployment.uid": newExpectedValue(exist, ""), + "k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-metrics-deployment-[a-z0-9]*"), + "k8s.replicaset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "deployment"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-deployment"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "metrics-daemonset", + dataType: pipeline.SignalMetrics, + service: "test-metrics-daemonset", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-metrics-daemonset-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.daemonset.name": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-daemonset"), + "k8s.daemonset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "daemonset"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-daemonset"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "logs-job", + dataType: pipeline.SignalLogs, + service: "test-logs-job", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-logs-job-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.job.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-job"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "job"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-job"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "logs-statefulset", + dataType: pipeline.SignalLogs, + service: "test-logs-statefulset", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-statefulset-0"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.statefulset.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-statefulset"), + "k8s.statefulset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "statefulset"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-statefulset"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "logs-deployment", + dataType: pipeline.SignalLogs, + service: "test-logs-deployment", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-logs-deployment-[a-z0-9]*-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-deployment"), + "k8s.deployment.uid": newExpectedValue(exist, ""), + "k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-logs-deployment-[a-z0-9]*"), + "k8s.replicaset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "deployment"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-deployment"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "logs-daemonset", + dataType: pipeline.SignalLogs, + service: "test-logs-daemonset", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-logs-daemonset-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.daemonset.name": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-daemonset"), + "k8s.daemonset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "daemonset"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-daemonset"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "profiles-job", + dataType: pipelineprofiles.SignalProfiles, + service: "test-profiles-job", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-job-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.job.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-job"), + "k8s.job.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "job"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-job"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "profiles-statefulset", + dataType: pipelineprofiles.SignalProfiles, + service: "test-profiles-statefulset", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-statefulset-0"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.statefulset.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-statefulset"), + "k8s.statefulset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "statefulset"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-statefulset"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "profiles-deployment", + dataType: pipelineprofiles.SignalProfiles, + service: "test-profiles-deployment", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-deployment-[a-z0-9]*-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.deployment.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-deployment"), + "k8s.deployment.uid": newExpectedValue(exist, ""), + "k8s.replicaset.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-deployment-[a-z0-9]*"), + "k8s.replicaset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "deployment"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-deployment"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + { + name: "profiles-daemonset", + dataType: pipelineprofiles.SignalProfiles, + service: "test-profiles-daemonset", + attrs: map[string]*expectedValue{ + "k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-profiles-daemonset-[a-z0-9]*"), + "k8s.pod.ip": newExpectedValue(exist, ""), + "k8s.pod.uid": newExpectedValue(regex, uidRe), + "k8s.pod.start_time": newExpectedValue(exist, ""), + "k8s.node.name": newExpectedValue(exist, ""), + "k8s.namespace.name": newExpectedValue(equal, testNs), + "k8s.daemonset.name": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-daemonset"), + "k8s.daemonset.uid": newExpectedValue(exist, ""), + "k8s.annotations.workload": newExpectedValue(equal, "daemonset"), + "k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-profiles-daemonset"), + "k8s.container.name": newExpectedValue(equal, "telemetrygen"), + "k8s.cluster.uid": newExpectedValue(regex, uidRe), + "container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"), + "container.image.repo_digests": newExpectedValue(regex, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen@sha256:[0-9a-fA-f]{64}"), + "container.image.tag": newExpectedValue(equal, "latest"), + "container.id": newExpectedValue(exist, ""), + "k8s.node.labels.foo": newExpectedValue(equal, "too"), + "k8s.namespace.labels.foons": newExpectedValue(equal, "barns"), + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + switch tc.dataType { + case pipeline.SignalTraces: + scanTracesForAttributes(t, tracesConsumer, tc.service, tc.attrs) + case pipeline.SignalMetrics: + scanMetricsForAttributes(t, metricsConsumer, tc.service, tc.attrs) + case pipeline.SignalLogs: + scanLogsForAttributes(t, logsConsumer, tc.service, tc.attrs) + case pipelineprofiles.SignalProfiles: + scanProfilesForAttributes(t, profilesConsumer, tc.service, tc.attrs) + default: + t.Fatalf("unknown data type %s", tc.dataType) + } + }) + } +} + func scanTracesForAttributes(t *testing.T, ts *consumertest.TracesSink, expectedService string, kvs map[string]*expectedValue, ) { diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index a984e7f85cbd..a589d5a1c1f5 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -209,6 +209,22 @@ func New( // Start registers pod event handlers and starts watching the kubernetes cluster for pod changes. func (c *WatchClient) Start() error { synced := make([]cache.InformerSynced, 0) + + // start the replicaSet informer first, as the replica sets need to be + // present at the time the pods are handled, to correctly establish the connection between pods and deployments + if c.Rules.DeploymentName || c.Rules.DeploymentUID { + reg, err := c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.handleReplicaSetAdd, + UpdateFunc: c.handleReplicaSetUpdate, + DeleteFunc: c.handleReplicaSetDelete, + }) + if err != nil { + return err + } + synced = append(synced, reg.HasSynced) + go c.replicasetInformer.Run(c.stopCh) + } + reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handlePodAdd, UpdateFunc: c.handlePodUpdate, @@ -231,19 +247,6 @@ func (c *WatchClient) Start() error { synced = append(synced, reg.HasSynced) go c.namespaceInformer.Run(c.stopCh) - if c.Rules.DeploymentName || c.Rules.DeploymentUID { - reg, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.handleReplicaSetAdd, - UpdateFunc: c.handleReplicaSetUpdate, - DeleteFunc: c.handleReplicaSetDelete, - }) - if err != nil { - return err - } - synced = append(synced, reg.HasSynced) - go c.replicasetInformer.Run(c.stopCh) - } - if c.nodeInformer != nil { reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNodeAdd,