From 04d7a696478171aed10033c24538b0f86448eb83 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 7 Jan 2025 23:39:11 +0100 Subject: [PATCH 01/13] [pkg/ottl] fix handling of nested maps within slices in flatten function (#36204) Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- .../ottl-fix-flattening-of-nested-slices.yaml | 27 +++++++++++++ pkg/ottl/e2e/e2e_test.go | 23 +++++------ pkg/ottl/ottlfuncs/func_flatten.go | 39 +++++++++++++------ pkg/ottl/ottlfuncs/func_flatten_test.go | 25 ++++++++++++ 4 files changed, 89 insertions(+), 25 deletions(-) create mode 100644 .chloggen/ottl-fix-flattening-of-nested-slices.yaml diff --git a/.chloggen/ottl-fix-flattening-of-nested-slices.yaml b/.chloggen/ottl-fix-flattening-of-nested-slices.yaml new file mode 100644 index 000000000000..0493417a3fd9 --- /dev/null +++ b/.chloggen/ottl-fix-flattening-of-nested-slices.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/ottl + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: fix handling of nested maps within slices in the `flatten` function + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36162] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/ottl/e2e/e2e_test.go b/pkg/ottl/e2e/e2e_test.go index bb1dd43b5282..72077e66dd23 100644 --- a/pkg/ottl/e2e/e2e_test.go +++ b/pkg/ottl/e2e/e2e_test.go @@ -71,13 +71,10 @@ func Test_e2e_editors(t *testing.T) { tCtx.GetLogRecord().Attributes().PutStr("foo.nested.test", "pass") tCtx.GetLogRecord().Attributes().Remove("things") - m1 := tCtx.GetLogRecord().Attributes().PutEmptyMap("things.0") - m1.PutStr("name", "foo") - m1.PutInt("value", 2) - - m2 := tCtx.GetLogRecord().Attributes().PutEmptyMap("things.1") - m2.PutStr("name", "bar") - m2.PutInt("value", 5) + tCtx.GetLogRecord().Attributes().PutStr("things.0.name", "foo") + tCtx.GetLogRecord().Attributes().PutInt("things.0.value", 2) + tCtx.GetLogRecord().Attributes().PutStr("things.1.name", "bar") + tCtx.GetLogRecord().Attributes().PutInt("things.1.value", 5) }, }, { @@ -89,6 +86,7 @@ func Test_e2e_editors(t *testing.T) { m.PutStr("test.http.url", "http://localhost/health") m.PutStr("test.flags", "A|B|C") m.PutStr("test.total.string", "123456789") + m.PutStr("test.foo.bar", "pass") m.PutStr("test.foo.flags", "pass") m.PutStr("test.foo.bar", "pass") @@ -96,13 +94,10 @@ func Test_e2e_editors(t *testing.T) { m.PutStr("test.foo.slice.0", "val") m.PutStr("test.foo.nested.test", "pass") - m1 := m.PutEmptyMap("test.things.0") - m1.PutStr("name", "foo") - m1.PutInt("value", 2) - - m2 := m.PutEmptyMap("test.things.1") - m2.PutStr("name", "bar") - m2.PutInt("value", 5) + m.PutStr("test.things.0.name", "foo") + m.PutInt("test.things.0.value", 2) + m.PutStr("test.things.1.name", "bar") + m.PutInt("test.things.1.value", 5) m.CopyTo(tCtx.GetLogRecord().Attributes()) }, }, diff --git a/pkg/ottl/ottlfuncs/func_flatten.go b/pkg/ottl/ottlfuncs/func_flatten.go index 709eb1bef746..b651cce4ce5f 100644 --- a/pkg/ottl/ottlfuncs/func_flatten.go +++ b/pkg/ottl/ottlfuncs/func_flatten.go @@ -54,28 +54,45 @@ func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.O } result := pcommon.NewMap() - flattenHelper(m, result, prefix, 0, depth) + flattenMap(m, result, prefix, 0, depth) result.MoveTo(m) return nil, nil }, nil } -func flattenHelper(m pcommon.Map, result pcommon.Map, prefix string, currentDepth, maxDepth int64) { +func flattenMap(m pcommon.Map, result pcommon.Map, prefix string, currentDepth, maxDepth int64) { if len(prefix) > 0 { prefix += "." } m.Range(func(k string, v pcommon.Value) bool { - switch { - case v.Type() == pcommon.ValueTypeMap && currentDepth < maxDepth: - flattenHelper(v.Map(), result, prefix+k, currentDepth+1, maxDepth) - case v.Type() == pcommon.ValueTypeSlice && currentDepth < maxDepth: - for i := 0; i < v.Slice().Len(); i++ { + return flattenValue(k, v, currentDepth, maxDepth, result, prefix) + }) +} + +func flattenSlice(s pcommon.Slice, result pcommon.Map, prefix string, currentDepth int64, maxDepth int64) { + for i := 0; i < s.Len(); i++ { + flattenValue(fmt.Sprintf("%d", i), s.At(i), currentDepth+1, maxDepth, result, prefix) + } +} + +func flattenValue(k string, v pcommon.Value, currentDepth int64, maxDepth int64, result pcommon.Map, prefix string) bool { + switch { + case v.Type() == pcommon.ValueTypeMap && currentDepth < maxDepth: + flattenMap(v.Map(), result, prefix+k, currentDepth+1, maxDepth) + case v.Type() == pcommon.ValueTypeSlice && currentDepth < maxDepth: + for i := 0; i < v.Slice().Len(); i++ { + switch { + case v.Slice().At(i).Type() == pcommon.ValueTypeMap && currentDepth+1 < maxDepth: + flattenMap(v.Slice().At(i).Map(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth) + case v.Slice().At(i).Type() == pcommon.ValueTypeSlice && currentDepth+1 < maxDepth: + flattenSlice(v.Slice().At(i).Slice(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth) + default: v.Slice().At(i).CopyTo(result.PutEmpty(fmt.Sprintf("%v.%v", prefix+k, i))) } - default: - v.CopyTo(result.PutEmpty(prefix + k)) } - return true - }) + default: + v.CopyTo(result.PutEmpty(prefix + k)) + } + return true } diff --git a/pkg/ottl/ottlfuncs/func_flatten_test.go b/pkg/ottl/ottlfuncs/func_flatten_test.go index 39448e6e2fd0..bbd2715e18a3 100644 --- a/pkg/ottl/ottlfuncs/func_flatten_test.go +++ b/pkg/ottl/ottlfuncs/func_flatten_test.go @@ -85,6 +85,31 @@ func Test_flatten(t *testing.T) { "occupants.1": "user 2", }, }, + { + name: "combination with mixed nested slices", + target: map[string]any{ + "name": "test", + "address": map[string]any{ + "street": "first", + "house": int64(1234), + }, + "occupants": []any{ + "user 1", + map[string]any{ + "name": "user 2", + }, + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expected: map[string]any{ + "name": "test", + "address.street": "first", + "address.house": int64(1234), + "occupants.0": "user 1", + "occupants.1.name": "user 2", + }, + }, { name: "deep nesting", target: map[string]any{ From 363c83709d86bea83bba79e1c5941308b75c84ec Mon Sep 17 00:00:00 2001 From: Christos Markou Date: Wed, 8 Jan 2025 00:40:09 +0200 Subject: [PATCH 02/13] [receiver/receiver_creator] Add support for enabling logs' collecting from K8s hints (#36581) #### Description This PR adds the logs part for https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34427 based on the design decided at https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34427#issuecomment-2436140051. See the README docs for the description of this feature: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35617/files#diff-4127365c4062a7510fb7fede0fa239e9232549732898303d94c12fef0433d39d #### Link to tracking issue Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34427 #### Testing Added unit-tests #### Documentation Added README section #### How to test this manually 1. Deploy the Collector helm chart: ```yaml mode: daemonset image: repository: otelcontribcol-dev tag: "latest" pullPolicy: IfNotPresent command: name: otelcontribcol clusterRole: create: true rules: - apiGroups: - '' resources: - 'pods' - 'nodes' verbs: - 'get' - 'list' - 'watch' - apiGroups: [ "" ] resources: [ "nodes/proxy"] verbs: [ "get" ] - apiGroups: - "" resources: - nodes/stats verbs: - get - nonResourceURLs: - "/metrics" verbs: - get extraVolumeMounts: - name: varlogpods mountPath: /var/log/pods readOnly: true extraVolumes: - name: varlogpods hostPath: path: /var/log/pods config: extensions: k8s_observer: auth_type: serviceAccount node: ${env:K8S_NODE_NAME} observe_nodes: true exporters: debug: verbosity: detailed receivers: receiver_creator/metrics: watch_observers: [ k8s_observer ] discovery: enabled: true ignore_receivers: - nginx2 receivers: receiver_creator/logs: watch_observers: [ k8s_observer ] discovery: enabled: true default_logs_discovery: false receivers: service: extensions: [health_check, k8s_observer] telemetry: logs: level: INFO pipelines: metrics: receivers: [ receiver_creator/metrics ] processors: [ batch ] exporters: [ debug ] logs/discovery: receivers: [ receiver_creator/logs ] #processors: [ batch ] exporters: [ debug ] ``` 2. Then deploy a target Pod with 2 containers: ```yaml apiVersion: apps/v1 kind: Deployment metadata: name: redis-deployment labels: app: redis spec: replicas: 1 selector: matchLabels: app: redis template: metadata: labels: app: redis annotations: io.opentelemetry.discovery.metrics.6379/enabled: "true" io.opentelemetry.discovery.metrics.6379/scraper: redis io.opentelemetry.discovery.metrics.6379/signals: metrics io.opentelemetry.discovery.metrics.6379/config: | collection_interval: "20s" timeout: "10s" io.opentelemetry.discovery.logs.busybox/enabled: "true" io.opentelemetry.discovery.logs.busybox/config: | operators: - id: some type: add field: attributes.tag value: hints spec: containers: - image: redis imagePullPolicy: IfNotPresent name: redis ports: - name: redis containerPort: 6379 protocol: TCP - name: busybox image: busybox args: - /bin/sh - -c - while true; do echo "otel logs at $(date +%H:%M:%S)" && sleep 15s; done ``` 3. Esnure that logs are collected from both containers and that Redis metrics are collected from the Redis container: ```console 2024-11-28T11:04:14.921Z info receivercreator@v0.114.0/observerhandler.go:201 starting receiver {"kind": "receiver", "name": "receiver_creator/metrics", "data_type": "metrics", "name": "redis/91ec7d5c-c6fb-4977-9dbb-c24a85101326_6379", "endpoint": "10.244.0.6:6379", "endpoint_id": "k8s_observer/91ec7d5c-c6fb-4977-9dbb-c24a85101326/redis(6379)", "config": {"collection_interval":"20s","endpoint":"10.244.0.6:6379","timeout":"10s"}} 2024-11-28T11:04:14.921Z info receivercreator@v0.114.0/observerhandler.go:201 starting receiver {"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/91ec7d5c-c6fb-4977-9dbb-c24a85101326_busybox", "endpoint": "10.244.0.6", "endpoint_id": "k8s_observer/91ec7d5c-c6fb-4977-9dbb-c24a85101326/busybox", "config": {"include":["/var/log/pods/default_redis-deployment-7777bf7db4-5rm6d_91ec7d5c-c6fb-4977-9dbb-c24a85101326/busybox/*.log"],"include_file_name":false,"include_file_path":true,"operators":[{"id":"container-parser","type":"container"},{"field":"attributes.tag","id":"some","type":"add","value":"hints"}]}} 2024-11-28T11:04:14.922Z info adapter/receiver.go:41 Starting stanza receiver {"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/91ec7d5c-c6fb-4977-9dbb-c24a85101326_busybox/receiver_creator/logs{endpoint=\"10.244.0.6\"}/k8s_observer/91ec7d5c-c6fb-4977-9dbb-c24a85101326/busybox"} 2024-11-28T11:04:15.122Z info fileconsumer/file.go:265 Started watching file {"kind": "receiver", "name": "receiver_creator/logs", "data_type": "logs", "name": "filelog/91ec7d5c-c6fb-4977-9dbb-c24a85101326_busybox/receiver_creator/logs{endpoint=\"10.244.0.6\"}/k8s_observer/91ec7d5c-c6fb-4977-9dbb-c24a85101326/busybox", "component": "fileconsumer", "path": "/var/log/pods/default_redis-deployment-7777bf7db4-5rm6d_91ec7d5c-c6fb-4977-9dbb-c24a85101326/busybox/0.log"} 2024-11-28T11:04:15.979Z info Metrics {"kind": "exporter", "data_type": "metrics", "name": "debug/2", "resource metrics": 1, "metrics": 26, "data points": 31} ``` ### Follow-ups 1. File an issue for enhancing default behaviors: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36581#discussion_r1864013358 --------- Signed-off-by: ChrsMark --- .chloggen/f_hints_logs.yaml | 27 ++ receiver/receivercreator/README.md | 99 ++++- receiver/receivercreator/config_test.go | 37 ++ receiver/receivercreator/discovery.go | 98 ++++- receiver/receivercreator/discovery_test.go | 365 +++++++++++++++++- receiver/receivercreator/fixtures_test.go | 16 + receiver/receivercreator/observerhandler.go | 17 +- .../receivercreator/observerhandler_test.go | 70 ++++ 8 files changed, 714 insertions(+), 15 deletions(-) create mode 100644 .chloggen/f_hints_logs.yaml diff --git a/.chloggen/f_hints_logs.yaml b/.chloggen/f_hints_logs.yaml new file mode 100644 index 000000000000..9e4b274aefbf --- /dev/null +++ b/.chloggen/f_hints_logs.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receivercreator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for starting logs' collection based on provided k8s annotations' hints + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34427] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/receivercreator/README.md b/receiver/receivercreator/README.md index ac836eb98e02..4fe12d38bcbe 100644 --- a/receiver/receivercreator/README.md +++ b/receiver/receivercreator/README.md @@ -458,7 +458,8 @@ receiver_creator/metrics: # ignore_receivers: [] ``` -Find bellow the supported annotations that user can define to automatically enable receivers to start collecting metrics signals from the target Pods/containers. +Find bellow the supported annotations that user can define to automatically enable receivers to start +collecting metrics and logs signals from the target Pods/containers. ### Supported metrics annotations @@ -511,11 +512,76 @@ The current implementation relies on the implementation of `k8sobserver` extensi the [pod_endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.111.0/extension/observer/k8sobserver/pod_endpoint.go). The hints are evaluated per container by extracting the annotations from each [`Port` endpoint](#Port) that is emitted. +### Supported logs annotations + +This feature enables `filelog` receiver in order to collect logs from the discovered Pods. + +#### Enable/disable discovery + +`io.opentelemetry.discovery.logs/enabled` (Required. Example: `"true"`) + +By default `"false"`. + +#### Define configuration + +The default configuration for the `filelog` receiver is the following: + +```yaml +include: + - /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log +include_file_name: false +include_file_path: true +operators: + - id: container-parser + type: container +``` +This default can be extended or overridden using the respective annotation: +`io.opentelemetry.discovery.logs/config` + +**Example:** + +```yaml +io.opentelemetry.discovery.logs/config: | + include_file_name: true + max_log_size: "2MiB" + operators: + - type: container + id: container-parser + - type: regex_parser + regex: "^(?P