diff --git a/internal/backendconnection/otelcolresources/collector_config_map.go b/internal/backendconnection/otelcolresources/collector_config_maps.go similarity index 66% rename from internal/backendconnection/otelcolresources/collector_config_map.go rename to internal/backendconnection/otelcolresources/collector_config_maps.go index 1e0f996..971b259 100644 --- a/internal/backendconnection/otelcolresources/collector_config_map.go +++ b/internal/backendconnection/otelcolresources/collector_config_maps.go @@ -24,29 +24,47 @@ type OtlpExporter struct { } var ( - //go:embed config.yaml.template - collectorConfigurationTemplateSource string - collectorConfigurationTemplate = template.Must( - template.New("collector-configuration").Parse(collectorConfigurationTemplateSource)) + //go:embed daemonset.config.yaml.template + daemonSetCollectorConfigurationTemplateSource string + daemonSetCollectorConfigurationTemplate = template.Must( + template.New("daemonset-collector-configuration").Parse(daemonSetCollectorConfigurationTemplateSource)) + + ////go:embed deployment.config.yaml.template + //deploymentCollectorConfigurationTemplateSource string + //deploymentCollectorConfigurationTemplate = template.Must( + // template.New("deployment-collector-configuration").Parse(deploymentCollectorConfigurationTemplateSource)) + authHeaderValue = fmt.Sprintf("Bearer ${env:%s}", authTokenEnvVarName) ) -func assembleCollectorConfigMap(config *oTelColConfig) (*corev1.ConfigMap, error) { +func assembleDaemonSetCollectorConfigMap(config *oTelColConfig) (*corev1.ConfigMap, error) { + return assembleCollectorConfigMap(config, daemonSetCollectorConfigurationTemplate) +} + +//func assembleDeploymentCollectorConfigMap(config *oTelColConfig) (*corev1.ConfigMap, error) { +// return assembleCollectorConfigMap(config, deploymentCollectorConfigurationTemplate) +//} + +func assembleCollectorConfigMap( + config *oTelColConfig, + template *template.Template, +) (*corev1.ConfigMap, error) { exporters, err := ConvertExportSettingsToExporterList(config.Export) if err != nil { return nil, fmt.Errorf("cannot assemble the exporters for the configuration: %w", err) } - collectorConfiguration, err := renderCollectorConfigs(&collectorConfigurationTemplateValues{ - Exporters: exporters, - IgnoreLogsFromNamespaces: []string{ - // Skipping kube-system, it requires bespoke filtering work - "kube-system", - // Skipping logs from the operator and the daemonset, otherwise - // logs will compound in case of log parsing errors - config.Namespace, - }, - DevelopmentMode: config.DevelopmentMode, - }) + collectorConfiguration, err := renderCollectorConfiguration(template, + &collectorConfigurationTemplateValues{ + Exporters: exporters, + IgnoreLogsFromNamespaces: []string{ + // Skipping kube-system, it requires bespoke filtering work + "kube-system", + // Skipping logs from the operator and the daemonset, otherwise + // logs will compound in case of log parsing errors + config.Namespace, + }, + DevelopmentMode: config.DevelopmentMode, + }) if err != nil { return nil, fmt.Errorf("cannot render the collector configuration template: %w", err) } @@ -137,11 +155,13 @@ func ConvertExportSettingsToExporterList(export dash0v1alpha1.Export) ([]OtlpExp return exporters, nil } -func renderCollectorConfigs(templateValues *collectorConfigurationTemplateValues) (string, error) { +func renderCollectorConfiguration( + template *template.Template, + templateValues *collectorConfigurationTemplateValues, +) (string, error) { var collectorConfiguration bytes.Buffer - if err := collectorConfigurationTemplate.Execute(&collectorConfiguration, templateValues); err != nil { + if err := template.Execute(&collectorConfiguration, templateValues); err != nil { return "", err } - return collectorConfiguration.String(), nil } diff --git a/internal/backendconnection/otelcolresources/collector_config_map_test.go b/internal/backendconnection/otelcolresources/collector_config_maps_test.go similarity index 80% rename from internal/backendconnection/otelcolresources/collector_config_map_test.go rename to internal/backendconnection/otelcolresources/collector_config_maps_test.go index e8c8e7a..21657fd 100644 --- a/internal/backendconnection/otelcolresources/collector_config_map_test.go +++ b/internal/backendconnection/otelcolresources/collector_config_maps_test.go @@ -30,7 +30,7 @@ var ( var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { It("should fail if no exporter is configured", func() { - _, err := assembleCollectorConfigMap(&oTelColConfig{ + _, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ Namespace: namespace, NamePrefix: namePrefix, Export: dash0v1alpha1.Export{}, @@ -39,7 +39,7 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { }) It("should fail to render the Dash0 exporter when no endpoint is provided", func() { - _, err := assembleCollectorConfigMap(&oTelColConfig{ + _, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ Namespace: namespace, NamePrefix: namePrefix, Export: dash0v1alpha1.Export{ @@ -58,7 +58,7 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { }) It("should render the Dash0 exporter", func() { - configMap, err := assembleCollectorConfigMap(&oTelColConfig{ + configMap, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ Namespace: namespace, NamePrefix: namePrefix, Export: dash0v1alpha1.Export{ @@ -76,13 +76,11 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { exportersRaw := collectorConfig["exporters"] Expect(exportersRaw).ToNot(BeNil()) exporters := exportersRaw.(map[string]interface{}) - Expect(exporters).To(HaveLen(2)) - debugExporter := exporters["debug"] - Expect(debugExporter).ToNot(BeNil()) + Expect(exporters).To(HaveLen(1)) - exporter2 := exporters["otlp/dash0"] - Expect(exporter2).ToNot(BeNil()) - dash0OtlpExporter := exporter2.(map[string]interface{}) + exporter := exporters["otlp/dash0"] + Expect(exporter).ToNot(BeNil()) + dash0OtlpExporter := exporter.(map[string]interface{}) Expect(dash0OtlpExporter).ToNot(BeNil()) Expect(dash0OtlpExporter["endpoint"]).To(Equal(EndpointDash0Test)) headersRaw := dash0OtlpExporter["headers"] @@ -93,11 +91,11 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { Expect(headers[util.Dash0DatasetHeaderName]).To(BeNil()) Expect(dash0OtlpExporter["encoding"]).To(BeNil()) - verifyPipelines(collectorConfig, "otlp/dash0") + verifyDownstreamExportersInPipelines(collectorConfig, "otlp/dash0") }) It("should render the Dash0 exporter with custom dataset", func() { - configMap, err := assembleCollectorConfigMap(&oTelColConfig{ + configMap, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ Namespace: namespace, NamePrefix: namePrefix, Export: dash0v1alpha1.Export{ @@ -116,13 +114,11 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { exportersRaw := collectorConfig["exporters"] Expect(exportersRaw).ToNot(BeNil()) exporters := exportersRaw.(map[string]interface{}) - Expect(exporters).To(HaveLen(2)) - debugExporter := exporters["debug"] - Expect(debugExporter).ToNot(BeNil()) + Expect(exporters).To(HaveLen(1)) - exporter2 := exporters["otlp/dash0"] - Expect(exporter2).ToNot(BeNil()) - dash0OtlpExporter := exporter2.(map[string]interface{}) + exporter := exporters["otlp/dash0"] + Expect(exporter).ToNot(BeNil()) + dash0OtlpExporter := exporter.(map[string]interface{}) Expect(dash0OtlpExporter).ToNot(BeNil()) Expect(dash0OtlpExporter["endpoint"]).To(Equal(EndpointDash0Test)) headersRaw := dash0OtlpExporter["headers"] @@ -133,11 +129,54 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { Expect(headers[util.Dash0DatasetHeaderName]).To(Equal("custom-dataset")) Expect(dash0OtlpExporter["encoding"]).To(BeNil()) - verifyPipelines(collectorConfig, "otlp/dash0") + verifyDownstreamExportersInPipelines(collectorConfig, "otlp/dash0") + }) + + It("should render a verbose debug exporter in development mode", func() { + configMap, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ + Namespace: namespace, + NamePrefix: namePrefix, + Export: dash0v1alpha1.Export{ + Dash0: &dash0v1alpha1.Dash0Configuration{ + Endpoint: EndpointDash0Test, + Authorization: dash0v1alpha1.Authorization{ + Token: &AuthorizationTokenTest, + }, + }, + }, + DevelopmentMode: true, + }) + + Expect(err).ToNot(HaveOccurred()) + collectorConfig := parseConfigMapContent(configMap) + exportersRaw := collectorConfig["exporters"] + Expect(exportersRaw).ToNot(BeNil()) + exporters := exportersRaw.(map[string]interface{}) + Expect(exporters).To(HaveLen(2)) + + debugExporterRaw := exporters["debug"] + Expect(debugExporterRaw).ToNot(BeNil()) + debugExporter := debugExporterRaw.(map[string]interface{}) + Expect(debugExporter["verbosity"]).To(Equal("detailed")) + + exporter := exporters["otlp/dash0"] + Expect(exporter).ToNot(BeNil()) + dash0OtlpExporter := exporter.(map[string]interface{}) + Expect(dash0OtlpExporter).ToNot(BeNil()) + Expect(dash0OtlpExporter["endpoint"]).To(Equal(EndpointDash0Test)) + headersRaw := dash0OtlpExporter["headers"] + Expect(headersRaw).ToNot(BeNil()) + headers := headersRaw.(map[string]interface{}) + Expect(headers).To(HaveLen(1)) + Expect(headers[util.AuthorizationHeaderName]).To(Equal(bearerWithAuthToken)) + Expect(headers[util.Dash0DatasetHeaderName]).To(BeNil()) + Expect(dash0OtlpExporter["encoding"]).To(BeNil()) + + verifyDownstreamExportersInPipelines(collectorConfig, "debug", "otlp/dash0") }) It("should fail to render a gRPC exporter when no endpoint is provided", func() { - _, err := assembleCollectorConfigMap(&oTelColConfig{ + _, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ Namespace: namespace, NamePrefix: namePrefix, Export: dash0v1alpha1.Export{ @@ -157,7 +196,7 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { }) It("should render an arbitrary gRPC exporter", func() { - configMap, err := assembleCollectorConfigMap(&oTelColConfig{ + configMap, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ Namespace: namespace, NamePrefix: namePrefix, Export: dash0v1alpha1.Export{ @@ -182,9 +221,7 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { exportersRaw := collectorConfig["exporters"] Expect(exportersRaw).ToNot(BeNil()) exporters := exportersRaw.(map[string]interface{}) - Expect(exporters).To(HaveLen(2)) - debugExporter := exporters["debug"] - Expect(debugExporter).ToNot(BeNil()) + Expect(exporters).To(HaveLen(1)) exporter2 := exporters["otlp/grpc"] Expect(exporter2).ToNot(BeNil()) @@ -199,11 +236,11 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { Expect(headers["Key2"]).To(Equal("Value2")) Expect(otlpGrpcExporter["encoding"]).To(BeNil()) - verifyPipelines(collectorConfig, "otlp/grpc") + verifyDownstreamExportersInPipelines(collectorConfig, "otlp/grpc") }) It("should fail to render an HTTP exporter when no endpoint is provided", func() { - _, err := assembleCollectorConfigMap(&oTelColConfig{ + _, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ Namespace: namespace, NamePrefix: namePrefix, Export: dash0v1alpha1.Export{ @@ -223,7 +260,7 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { }) It("should fail to render an HTTP exporter when no encoding is provided", func() { - _, err := assembleCollectorConfigMap(&oTelColConfig{ + _, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ Namespace: namespace, NamePrefix: namePrefix, Export: dash0v1alpha1.Export{ @@ -244,7 +281,7 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { }) It("should render an arbitrary HTTP exporter", func() { - configMap, err := assembleCollectorConfigMap(&oTelColConfig{ + configMap, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ Namespace: namespace, NamePrefix: namePrefix, Export: dash0v1alpha1.Export{ @@ -270,9 +307,7 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { exportersRaw := collectorConfig["exporters"] Expect(exportersRaw).ToNot(BeNil()) exporters := exportersRaw.(map[string]interface{}) - Expect(exporters).To(HaveLen(2)) - debugExporter := exporters["debug"] - Expect(debugExporter).ToNot(BeNil()) + Expect(exporters).To(HaveLen(1)) exporter2 := exporters["otlphttp/json"] Expect(exporter2).ToNot(BeNil()) @@ -287,11 +322,11 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { Expect(headers["Key2"]).To(Equal("Value2")) Expect(otlpHttpExporter["encoding"]).To(Equal("json")) - verifyPipelines(collectorConfig, "otlphttp/json") + verifyDownstreamExportersInPipelines(collectorConfig, "otlphttp/json") }) It("should render the Dash0 exporter together with a gRPC exporter", func() { - configMap, err := assembleCollectorConfigMap(&oTelColConfig{ + configMap, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ Namespace: namespace, NamePrefix: namePrefix, Export: dash0v1alpha1.Export{ @@ -316,9 +351,7 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { exportersRaw := collectorConfig["exporters"] Expect(exportersRaw).ToNot(BeNil()) exporters := exportersRaw.(map[string]interface{}) - Expect(exporters).To(HaveLen(3)) - debugExporter := exporters["debug"] - Expect(debugExporter).ToNot(BeNil()) + Expect(exporters).To(HaveLen(2)) exporter2 := exporters["otlp/dash0"] Expect(exporter2).ToNot(BeNil()) @@ -343,11 +376,11 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { Expect(headers["Key1"]).To(Equal("Value1")) Expect(httpExporter["encoding"]).To(BeNil()) - verifyPipelines(collectorConfig, "otlp/dash0", "otlp/grpc") + verifyDownstreamExportersInPipelines(collectorConfig, "otlp/dash0", "otlp/grpc") }) It("should render the Dash0 exporter together with an HTTP exporter", func() { - configMap, err := assembleCollectorConfigMap(&oTelColConfig{ + configMap, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ Namespace: namespace, NamePrefix: namePrefix, Export: dash0v1alpha1.Export{ @@ -373,9 +406,7 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { exportersRaw := collectorConfig["exporters"] Expect(exportersRaw).ToNot(BeNil()) exporters := exportersRaw.(map[string]interface{}) - Expect(exporters).To(HaveLen(3)) - debugExporter := exporters["debug"] - Expect(debugExporter).ToNot(BeNil()) + Expect(exporters).To(HaveLen(2)) exporter2 := exporters["otlp/dash0"] Expect(exporter2).ToNot(BeNil()) @@ -400,11 +431,11 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { Expect(headers["Key1"]).To(Equal("Value1")) Expect(httpExporter["encoding"]).To(Equal("proto")) - verifyPipelines(collectorConfig, "otlp/dash0", "otlphttp/proto") + verifyDownstreamExportersInPipelines(collectorConfig, "otlp/dash0", "otlphttp/proto") }) It("should render a gRPC exporter together with an HTTP exporter", func() { - configMap, err := assembleCollectorConfigMap(&oTelColConfig{ + configMap, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ Namespace: namespace, NamePrefix: namePrefix, Export: dash0v1alpha1.Export{ @@ -431,9 +462,7 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { exportersRaw := collectorConfig["exporters"] Expect(exportersRaw).ToNot(BeNil()) exporters := exportersRaw.(map[string]interface{}) - Expect(exporters).To(HaveLen(3)) - debugExporter := exporters["debug"] - Expect(debugExporter).ToNot(BeNil()) + Expect(exporters).To(HaveLen(2)) exporter2 := exporters["otlp/grpc"] Expect(exporter2).ToNot(BeNil()) @@ -459,11 +488,11 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { Expect(headers["Key2"]).To(Equal("Value2")) Expect(httpExporter["encoding"]).To(Equal("proto")) - verifyPipelines(collectorConfig, "otlp/grpc", "otlphttp/proto") + verifyDownstreamExportersInPipelines(collectorConfig, "otlp/grpc", "otlphttp/proto") }) It("should render a combination of all three exporter types", func() { - configMap, err := assembleCollectorConfigMap(&oTelColConfig{ + configMap, err := assembleDaemonSetCollectorConfigMap(&oTelColConfig{ Namespace: namespace, NamePrefix: namePrefix, Export: dash0v1alpha1.Export{ @@ -489,6 +518,7 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { Encoding: dash0v1alpha1.Json, }, }, + DevelopmentMode: true, }) Expect(err).ToNot(HaveOccurred()) @@ -497,8 +527,11 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { Expect(exportersRaw).ToNot(BeNil()) exporters := exportersRaw.(map[string]interface{}) Expect(exporters).To(HaveLen(4)) - debugExporter := exporters["debug"] - Expect(debugExporter).ToNot(BeNil()) + + debugExporterRaw := exporters["debug"] + Expect(debugExporterRaw).ToNot(BeNil()) + debugExporter := debugExporterRaw.(map[string]interface{}) + Expect(debugExporter["verbosity"]).To(Equal("detailed")) exporter2 := exporters["otlp/dash0"] Expect(exporter2).ToNot(BeNil()) @@ -533,7 +566,7 @@ var _ = Describe("The OpenTelemetry Collector ConfigMap conent", func() { Expect(headers["Key2"]).To(Equal("Value2")) Expect(httpExporter["encoding"]).To(Equal("json")) - verifyPipelines(collectorConfig, "otlp/dash0", "otlp/grpc", "otlphttp/json") + verifyDownstreamExportersInPipelines(collectorConfig, "debug", "otlp/dash0", "otlp/grpc", "otlphttp/json") }) }) @@ -545,16 +578,19 @@ func parseConfigMapContent(configMap *corev1.ConfigMap) map[string]interface{} { return *configMapParsed } -func verifyPipelines(collectorConfig map[string]interface{}, expectedExporters ...string) { +func verifyDownstreamExportersInPipelines(collectorConfig map[string]interface{}, expectedExporters ...string) { pipelines := ((collectorConfig["service"]).(map[string]interface{})["pipelines"]).(map[string]interface{}) Expect(pipelines).ToNot(BeNil()) tracesPipeline := (pipelines["traces/downstream"]).(map[string]interface{}) tracesExporters := (tracesPipeline["exporters"]).([]interface{}) + Expect(tracesExporters).To(HaveLen(len(expectedExporters))) Expect(tracesExporters).To(ContainElements(expectedExporters)) metricsPipeline := (pipelines["metrics/downstream"]).(map[string]interface{}) metricsExporters := (metricsPipeline["exporters"]).([]interface{}) + Expect(metricsExporters).To(HaveLen(len(expectedExporters))) Expect(metricsExporters).To(ContainElements(expectedExporters)) logsPipeline := (pipelines["logs/downstream"]).(map[string]interface{}) logsExporters := (logsPipeline["exporters"]).([]interface{}) Expect(logsExporters).To(ContainElements(expectedExporters)) + Expect(logsExporters).To(HaveLen(len(expectedExporters))) } diff --git a/internal/backendconnection/otelcolresources/config.yaml.template b/internal/backendconnection/otelcolresources/daemonset.config.yaml.template similarity index 97% rename from internal/backendconnection/otelcolresources/config.yaml.template rename to internal/backendconnection/otelcolresources/daemonset.config.yaml.template index d8b492c..0eb8152 100644 --- a/internal/backendconnection/otelcolresources/config.yaml.template +++ b/internal/backendconnection/otelcolresources/daemonset.config.yaml.template @@ -5,8 +5,6 @@ exporters: {{- if .DevelopmentMode }} debug: verbosity: detailed -{{- else }} - debug: {} {{- end }} {{- range $i, $exporter := .Exporters }} {{ $exporter.Name }}: @@ -188,6 +186,9 @@ service: - memory_limiter - batch exporters: + {{- if .DevelopmentMode }} + - debug + {{- end }} {{- range $i, $exporter := .Exporters }} - {{ $exporter.Name }} {{- end }} @@ -201,6 +202,9 @@ service: - memory_limiter - batch exporters: + {{- if .DevelopmentMode }} + - debug + {{- end }} {{- range $i, $exporter := .Exporters }} - {{ $exporter.Name }} {{- end }} @@ -229,7 +233,9 @@ service: - memory_limiter - batch exporters: + {{- if .DevelopmentMode }} - debug + {{- end }} {{- range $i, $exporter := .Exporters }} - {{ $exporter.Name }} {{- end }} diff --git a/internal/backendconnection/otelcolresources/deployment.config.yaml.template b/internal/backendconnection/otelcolresources/deployment.config.yaml.template new file mode 100644 index 0000000..db04b49 --- /dev/null +++ b/internal/backendconnection/otelcolresources/deployment.config.yaml.template @@ -0,0 +1,69 @@ +exporters: +{{- if .DevelopmentMode }} + debug: + verbosity: detailed +{{- else }} + debug: {} +{{- end }} +{{- range $i, $exporter := .Exporters }} + {{ $exporter.Name }}: + endpoint: "{{ $exporter.Endpoint }}" +{{- if $exporter.Headers }} + headers: +{{- range $i, $header := $exporter.Headers }} + "{{ $header.Name }}": "{{ $header.Value }}" +{{- end }} +{{- end }} +{{- if $exporter.Encoding }} + encoding: "{{ $exporter.Encoding }}" +{{- end }} +{{- end }} + +extensions: + health_check: + endpoint: ${env:MY_POD_IP}:13133 + +processors: + batch: {} + + memory_limiter: + check_interval: 5s + limit_percentage: 80 + spike_limit_percentage: 25 + + resourcedetection: + detectors: + - env + - system + - eks + - ecs + - ec2 + - gcp + - aks + - azure + - k8snode + +receivers: + k8s_cluster: {} + +service: + extensions: + - health_check + + pipelines: + + metrics/downstream: + receivers: + - k8s_cluster + processors: + - memory_limiter + - resourcedetection + - batch + exporters: + {{- range $i, $exporter := .Exporters }} + - {{ $exporter.Name }} + {{- end }} + + telemetry: + metrics: + address: ${env:MY_POD_IP}:8888 \ No newline at end of file diff --git a/internal/backendconnection/otelcolresources/desired_state.go b/internal/backendconnection/otelcolresources/desired_state.go index 922e587..bf7d174 100644 --- a/internal/backendconnection/otelcolresources/desired_state.go +++ b/internal/backendconnection/otelcolresources/desired_state.go @@ -44,6 +44,11 @@ const ( // ports. When the operator creates its daemonset, the pods of one of the two otelcol daemonsets would fail to start // due to port conflicts. + otlpGrpcPort = 4317 + otlpHttpPort = 4318 + + probesHttpPort = 13133 + rbacApiVersion = "rbac.authorization.k8s.io/v1" serviceComponent = "agent-collector" @@ -66,18 +71,12 @@ const ( authTokenEnvVarName = "AUTH_TOKEN" - collectorConfigurationYaml = "config.yaml" - - pidFileVolumeName = "opentelemetry-collector-pidfile" - - offsetsDirPath = "/var/otelcol/filelogreceiver_offsets" -) - -const ( - otlpGrpcPort = 4317 - otlpHttpPort = 4318 + collectorConfigurationYaml = "config.yaml" + collectorConfigurationFilePath = "/etc/otelcol/conf/" + collectorConfigurationYaml - probesHttpPort = 13133 + collectorPidFilePath = "/etc/otelcol/run/pid.file" + pidFileVolumeName = "opentelemetry-collector-pidfile" + offsetsDirPath = "/var/otelcol/filelogreceiver_offsets" ) var ( @@ -86,27 +85,88 @@ var ( appKubernetesIoInstanceKey: appKubernetesIoInstanceValue, appKubernetesIoComponentLabelKey: serviceComponent, } + + nodeNameFieldSpec = corev1.ObjectFieldSelector{ + FieldPath: "spec.nodeName", + } + podUidFieldSpec = corev1.ObjectFieldSelector{ + FieldPath: "metadata.uid", + } + k8sNodeNameEnvVar = corev1.EnvVar{ + Name: "K8S_NODE_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &nodeNameFieldSpec, + }, + } + k8sPodUidEnvVar = corev1.EnvVar{ + Name: "K8S_POD_UID", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &podUidFieldSpec, + }, + } + + configMapItems = []corev1.KeyToPath{{ + Key: collectorConfigurationYaml, + Path: collectorConfigurationYaml, + }} + + collectorConfigVolume = corev1.VolumeMount{ + Name: "opentelemetry-collector-configmap", + MountPath: "/etc/otelcol/conf", + ReadOnly: true, + } + collectorPidFileMountRW = corev1.VolumeMount{ + Name: pidFileVolumeName, + MountPath: filepath.Dir(collectorPidFilePath), + ReadOnly: false, + } + filelogReceiverOffsetsVolumeMount = corev1.VolumeMount{ + Name: "filelogreceiver-offsets", + MountPath: offsetsDirPath, + ReadOnly: false, + } + + collectorProbe = corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/", + Port: intstr.FromInt32(probesHttpPort), + }, + }, + } ) func assembleDesiredState(config *oTelColConfig) ([]client.Object, error) { var desiredState []client.Object desiredState = append(desiredState, serviceAccount(config)) - collectorConfigMap, err := assembleCollectorConfigMap(config) + daemonSetCollectorConfigMap, err := assembleDaemonSetCollectorConfigMap(config) if err != nil { return desiredState, err } - desiredState = append(desiredState, collectorConfigMap) + desiredState = append(desiredState, daemonSetCollectorConfigMap) desiredState = append(desiredState, assembleFilelogOffsetsConfigMap(config)) desiredState = append(desiredState, assembleClusterRole(config)) desiredState = append(desiredState, assembleClusterRoleBinding(config)) desiredState = append(desiredState, assembleRole(config)) desiredState = append(desiredState, assembleRoleBinding(config)) desiredState = append(desiredState, assembleService(config)) - collectorDaemonSet, err := assembleDaemonSet(config) + collectorDaemonSet, err := assembleCollectorDaemonSet(config) if err != nil { return desiredState, err } desiredState = append(desiredState, collectorDaemonSet) + + //deploymentCollectorConfigMap, err := assembleDeploymentCollectorConfigMap(config) + //if err != nil { + // return desiredState, err + //} + //desiredState = append(desiredState, deploymentCollectorConfigMap) + //collectorDeployment, err := assembleCollectorDeployment(config) + //if err != nil { + // return desiredState, err + //} + // desiredState = append(desiredState, collectorDeployment) + return desiredState, nil } @@ -281,17 +341,114 @@ func assembleService(config *oTelColConfig) *corev1.Service { } } -func assembleDaemonSet(config *oTelColConfig) (*appsv1.DaemonSet, error) { - configMapItems := []corev1.KeyToPath{{ - Key: collectorConfigurationYaml, - Path: collectorConfigurationYaml, - }} +func assembleCollectorDaemonSet(config *oTelColConfig) (*appsv1.DaemonSet, error) { + collectorContainer, err := assembleCollectorContainer(config) + if err != nil { + return nil, err + } - collectorPidFilePath := "/etc/otelcol/run/pid.file" + collectorDaemonSet := &appsv1.DaemonSet{ + TypeMeta: metav1.TypeMeta{ + Kind: "DaemonSet", + APIVersion: "apps/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name(config.NamePrefix, openTelemetryCollectorAgent), + Namespace: config.Namespace, + Labels: labels(true), + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: daemonSetMatchLabels, + }, + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.RollingUpdateDaemonSetStrategyType, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: daemonSetMatchLabels, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: serviceAccountName(config.NamePrefix), + SecurityContext: &corev1.PodSecurityContext{}, + // This setting is required to enable the configuration reloader process to send Unix signals to the + // collector process. + ShareProcessNamespace: &util.True, + InitContainers: []corev1.Container{assembleFileLogOffsetSynchInitContainer(config)}, + Containers: []corev1.Container{ + collectorContainer, + assembleConfigurationReloaderContainer(config), + assembleFileLogOffsetSynchContainer(config), + }, + Volumes: assembleCollectorDaemonSetVolumes(config, configMapItems), + HostNetwork: false, + }, + }, + }, + } + + if config.SelfMonitoringConfiguration.Enabled { + err = selfmonitoring.EnableSelfMonitoringInCollectorDaemonSet( + collectorDaemonSet, + config.SelfMonitoringConfiguration, + config.Images.GetOperatorVersion(), + config.DevelopmentMode, + ) + if err != nil { + return nil, err + } + } + + return collectorDaemonSet, nil +} + +func assembleFileLogOffsetSynchContainer(config *oTelColConfig) corev1.Container { + filelogOffsetSynchContainer := corev1.Container{ + Name: "filelog-offset-synch", + Args: []string{"--mode=synch"}, + SecurityContext: &corev1.SecurityContext{}, + Image: config.Images.FilelogOffsetSynchImage, + Env: []corev1.EnvVar{ + { + Name: "GOMEMLIMIT", + Value: "4MiB", + }, + { + Name: "K8S_CONFIGMAP_NAMESPACE", + Value: config.Namespace, + }, + { + Name: "K8S_CONFIGMAP_NAME", + Value: filelogReceiverOffsetsConfigMapName(config.NamePrefix), + }, + + { + Name: "FILELOG_OFFSET_DIRECTORY_PATH", + Value: offsetsDirPath, + }, + k8sNodeNameEnvVar, + k8sPodUidEnvVar, + }, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("12Mi"), + }, + }, + VolumeMounts: []corev1.VolumeMount{filelogReceiverOffsetsVolumeMount}, + } + if config.Images.FilelogOffsetSynchImagePullPolicy != "" { + filelogOffsetSynchContainer.ImagePullPolicy = config.Images.FilelogOffsetSynchImagePullPolicy + } + return filelogOffsetSynchContainer +} +func assembleCollectorDaemonSetVolumes( + config *oTelColConfig, + configMapItems []corev1.KeyToPath, +) []corev1.Volume { pidFileVolumeSizeLimit := resource.MustParse("1M") offsetsVolumeSizeLimit := resource.MustParse("10M") - volumes := []corev1.Volume{ + return []corev1.Volume{ { Name: "filelogreceiver-offsets", VolumeSource: corev1.VolumeSource{ @@ -336,29 +493,10 @@ func assembleDaemonSet(config *oTelColConfig) (*appsv1.DaemonSet, error) { }, }, } +} - collectorConfigVolume := corev1.VolumeMount{ - Name: "opentelemetry-collector-configmap", - MountPath: "/etc/otelcol/conf", - ReadOnly: true, - } - - collectorPidFileMountRW := corev1.VolumeMount{ - Name: pidFileVolumeName, - MountPath: filepath.Dir(collectorPidFilePath), - ReadOnly: false, - } - - collectorPidFileMountRO := collectorPidFileMountRW - collectorPidFileMountRO.ReadOnly = true - - filelogReceiverOffsetsVolumeMount := corev1.VolumeMount{ - Name: "filelogreceiver-offsets", - MountPath: offsetsDirPath, - ReadOnly: false, - } - - collectorVolumeMounts := []corev1.VolumeMount{ +func assembleCollectorDaemonSetVolumeMounts() []corev1.VolumeMount { + return []corev1.VolumeMount{ collectorConfigVolume, collectorPidFileMountRW, { @@ -375,29 +513,9 @@ func assembleDaemonSet(config *oTelColConfig) (*appsv1.DaemonSet, error) { }, filelogReceiverOffsetsVolumeMount, } +} - nodeNameFieldSpec := corev1.ObjectFieldSelector{ - FieldPath: "spec.nodeName", - } - - podUidFieldSpec := corev1.ObjectFieldSelector{ - FieldPath: "metadata.uid", - } - - k8sNodeNameEnvVar := corev1.EnvVar{ - Name: "K8S_NODE_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &nodeNameFieldSpec, - }, - } - - k8sPodUidEnvVar := corev1.EnvVar{ - Name: "K8S_POD_UID", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &podUidFieldSpec, - }, - } - +func assembleCollectorDaemonSetEnvVars(config *oTelColConfig) ([]corev1.EnvVar, error) { collectorEnv := []corev1.EnvVar{ { Name: "MY_POD_IP", @@ -430,16 +548,17 @@ func assembleDaemonSet(config *oTelColConfig) (*appsv1.DaemonSet, error) { collectorEnv = append(collectorEnv, authTokenEnvVar) } - probe := corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/", - Port: intstr.FromInt32(probesHttpPort), - }, - }, - } + return collectorEnv, nil +} - collectorConfigurationFilePath := "/etc/otelcol/conf/" + collectorConfigurationYaml +func assembleCollectorContainer( + config *oTelColConfig, +) (corev1.Container, error) { + collectorVolumeMounts := assembleCollectorDaemonSetVolumeMounts() + collectorEnv, err := assembleCollectorDaemonSetEnvVars(config) + if err != nil { + return corev1.Container{}, err + } collectorContainer := corev1.Container{ Name: openTelemetryCollector, @@ -461,8 +580,8 @@ func assembleDaemonSet(config *oTelColConfig) (*appsv1.DaemonSet, error) { }, }, Env: collectorEnv, - LivenessProbe: &probe, - ReadinessProbe: &probe, + LivenessProbe: &collectorProbe, + ReadinessProbe: &collectorProbe, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ corev1.ResourceMemory: resource.MustParse("500Mi"), @@ -473,7 +592,12 @@ func assembleDaemonSet(config *oTelColConfig) (*appsv1.DaemonSet, error) { if config.Images.CollectorImagePullPolicy != "" { collectorContainer.ImagePullPolicy = config.Images.CollectorImagePullPolicy } + return collectorContainer, nil +} +func assembleConfigurationReloaderContainer(config *oTelColConfig) corev1.Container { + collectorPidFileMountRO := collectorPidFileMountRW + collectorPidFileMountRO.ReadOnly = true configurationReloaderContainer := corev1.Container{ Name: configReloader, Args: []string{ @@ -500,7 +624,10 @@ func assembleDaemonSet(config *oTelColConfig) (*appsv1.DaemonSet, error) { if config.Images.ConfigurationReloaderImagePullPolicy != "" { configurationReloaderContainer.ImagePullPolicy = config.Images.ConfigurationReloaderImagePullPolicy } + return configurationReloaderContainer +} +func assembleFileLogOffsetSynchInitContainer(config *oTelColConfig) corev1.Container { initFilelogOffsetSynchContainer := corev1.Container{ Name: "filelog-offset-init", Args: []string{"--mode=init"}, @@ -537,99 +664,14 @@ func assembleDaemonSet(config *oTelColConfig) (*appsv1.DaemonSet, error) { if config.Images.FilelogOffsetSynchImagePullPolicy != "" { initFilelogOffsetSynchContainer.ImagePullPolicy = config.Images.FilelogOffsetSynchImagePullPolicy } - - filelogOffsetSynchContainer := corev1.Container{ - Name: "filelog-offset-synch", - Args: []string{"--mode=synch"}, - SecurityContext: &corev1.SecurityContext{}, - Image: config.Images.FilelogOffsetSynchImage, - Env: []corev1.EnvVar{ - { - Name: "GOMEMLIMIT", - Value: "4MiB", - }, - { - Name: "K8S_CONFIGMAP_NAMESPACE", - Value: config.Namespace, - }, - { - Name: "K8S_CONFIGMAP_NAME", - Value: filelogReceiverOffsetsConfigMapName(config.NamePrefix), - }, - - { - Name: "FILELOG_OFFSET_DIRECTORY_PATH", - Value: offsetsDirPath, - }, - k8sNodeNameEnvVar, - k8sPodUidEnvVar, - }, - Resources: corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceMemory: resource.MustParse("12Mi"), - }, - }, - VolumeMounts: []corev1.VolumeMount{filelogReceiverOffsetsVolumeMount}, - } - if config.Images.FilelogOffsetSynchImagePullPolicy != "" { - filelogOffsetSynchContainer.ImagePullPolicy = config.Images.FilelogOffsetSynchImagePullPolicy - } - - collectorDaemonSet := &appsv1.DaemonSet{ - TypeMeta: metav1.TypeMeta{ - Kind: "DaemonSet", - APIVersion: "apps/v1", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: name(config.NamePrefix, openTelemetryCollectorAgent), - Namespace: config.Namespace, - Labels: labels(true), - }, - Spec: appsv1.DaemonSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: daemonSetMatchLabels, - }, - UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ - Type: appsv1.RollingUpdateDaemonSetStrategyType, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: daemonSetMatchLabels, - }, - Spec: corev1.PodSpec{ - ServiceAccountName: serviceAccountName(config.NamePrefix), - SecurityContext: &corev1.PodSecurityContext{}, - // This setting is required to enable the configuration reloader process to send Unix signals to the - // collector process. - ShareProcessNamespace: &util.True, - InitContainers: []corev1.Container{initFilelogOffsetSynchContainer}, - Containers: []corev1.Container{ - collectorContainer, - configurationReloaderContainer, - filelogOffsetSynchContainer, - }, - Volumes: volumes, - HostNetwork: false, - }, - }, - }, - } - - if config.SelfMonitoringConfiguration.Enabled { - err := selfmonitoring.EnableSelfMonitoringInCollectorDaemonSet( - collectorDaemonSet, - config.SelfMonitoringConfiguration, - config.Images.GetOperatorVersion(), - config.DevelopmentMode, - ) - if err != nil { - return nil, err - } - } - - return collectorDaemonSet, nil + return initFilelogOffsetSynchContainer } +// func assembleCollectorDeployment(config *oTelColConfig) (*appsv1.DaemonSet, error) { +// // TODO implement the deployment assemblage +// return nil, nil +// } + func serviceAccountName(namePrefix string) string { return name(namePrefix, openTelemetryCollector) }