From a72bbd17062400dd5ea118294595dd2f0c79e977 Mon Sep 17 00:00:00 2001 From: Luiz Pegoraro Date: Wed, 1 Nov 2023 14:27:49 -0300 Subject: [PATCH] feat(agent): add struct parse to otel configuration. --- agent/backend/otel/exporter_builder.go | 110 +++++++++++++++++++- agent/backend/otel/exporter_builder_test.go | 67 ++++++++++++ 2 files changed, 174 insertions(+), 3 deletions(-) create mode 100644 agent/backend/otel/exporter_builder_test.go diff --git a/agent/backend/otel/exporter_builder.go b/agent/backend/otel/exporter_builder.go index 8c2ed53d4..125e16994 100644 --- a/agent/backend/otel/exporter_builder.go +++ b/agent/backend/otel/exporter_builder.go @@ -1,13 +1,117 @@ package otel -// TODO Create a struct to hold the exporter and processor as default to inject the policy id and name as attribute with an attribute processor -type defaultExporters struct { +import ( + "go.uber.org/zap" + "gopkg.in/yaml.v3" +) + +type ExporterBuilder interface { + GetStructFromYaml(yamlString string) (openTelemetryConfig, error) + MergeDefaultValueWithPolicy(config openTelemetryConfig, policyName string) (openTelemetryConfig, error) +} + +type openTelemetryConfig struct { + Receivers map[string]interface{} `yaml:"receivers"` + Processors map[string]interface{} `yaml:"processors"` + Extensions map[string]interface{} `yaml:"extensions"` + Exporters *exporters `yaml:"exporters"` + Service *service `yaml:"service"` +} + +type exporters struct { Otlp *defaultOtlpExporter `yaml:"otlp"` } type defaultOtlpExporter struct { + Endpoint string `yaml:"endpoint"` + Tls *tls `yaml:"tls"` +} + +type tls struct { + Insecure bool `yaml:"insecure"` +} + +type service struct { + Pipelines *pipelines `yaml:"pipelines"` +} + +type pipelines struct { + Metrics *pipeline `yaml:"metrics"` + Traces *pipeline `yaml:"traces"` + Logs *pipeline `yaml:"logs"` } -func (o *openTelemetryBackend) buildDefaultExporterAndProcessor() { +type pipeline struct { + Exporters []string `yaml:"exporters"` + Receivers []string `yaml:"receivers"` + Processors []string `yaml:"processors"` +} + +func getExporterBuilder(logger *zap.Logger) *exporterBuilder { + return &exporterBuilder{logger: logger} +} + +type exporterBuilder struct { + logger *zap.Logger +} + +func (e *exporterBuilder) GetStructFromYaml(yamlString string) (openTelemetryConfig, error) { + var config openTelemetryConfig + err := yaml.Unmarshal([]byte(yamlString), &config) + if err != nil { + e.logger.Error("failed to unmarshal yaml string", zap.Error(err)) + return openTelemetryConfig{}, err + } + return config, nil +} + +func (e *exporterBuilder) MergeDefaultValueWithPolicy(config openTelemetryConfig, policyId string, policyName string) (openTelemetryConfig, error) { + defaultOtlpExporter := defaultOtlpExporter{ + Endpoint: "localhost:4317", + Tls: &tls{ + Insecure: true, + }, + } + // Override any openTelemetry exporter that may come, to connect to agent's otlp receiver + config.Exporters = &exporters{&defaultOtlpExporter} + config.Processors["attributes/policy_data"] = map[string]interface{}{ + "actions": []struct { + Key string `yaml:"key"` + Value string `yaml:"value"` + Action string `yaml:"action"` + }{ + {Key: "policy_id", Value: policyId, Action: "insert"}, + {Key: "policy_name", Value: policyName, Action: "insert"}, + }, + } + // Override metrics exporter + config.Service.Pipelines.Metrics.Exporters = []string{"otlp"} + config.Service.Pipelines.Traces.Exporters = []string{"otlp"} + config.Service.Pipelines.Logs.Exporters = []string{"otlp"} + + // Append attributes processor + config.Service.Pipelines.Metrics.Processors = append(config.Service.Pipelines.Metrics.Processors, "attributes/policy_data") + config.Service.Pipelines.Traces.Processors = append(config.Service.Pipelines.Traces.Processors, "attributes/policy_data") + config.Service.Pipelines.Logs.Processors = append(config.Service.Pipelines.Logs.Processors, "attributes/policy_data") + + return config, nil +} +func (o *openTelemetryBackend) buildDefaultExporterAndProcessor(policyYaml string, policyId string, policyName string) (openTelemetryConfig, error) { + defaultPolicyYaml, err := yaml.Marshal(policyYaml) + if err != nil { + o.logger.Warn("yaml policy marshal failure", zap.String("policy_id", policyId)) + return openTelemetryConfig{}, err + } + defaultPolicyString := string(defaultPolicyYaml) + builder := getExporterBuilder(o.logger) + defaultPolicyStruct, err := builder.GetStructFromYaml(defaultPolicyString) + if err != nil { + return openTelemetryConfig{}, err + } + defaultPolicyStruct, err = builder.MergeDefaultValueWithPolicy(defaultPolicyStruct, policyId, policyName) + if err != nil { + return openTelemetryConfig{}, err + } + return defaultPolicyStruct, nil } diff --git a/agent/backend/otel/exporter_builder_test.go b/agent/backend/otel/exporter_builder_test.go new file mode 100644 index 000000000..548979689 --- /dev/null +++ b/agent/backend/otel/exporter_builder_test.go @@ -0,0 +1,67 @@ +package otel + +import ( + "go.uber.org/zap" + "testing" +) + +func TestBuildDefaultPolicy(t *testing.T) { + testCases := []struct { + caseName string + inputString string + policyId string + policyName string + expectedStruct openTelemetryConfig + processedString string + wantErr error + }{ + { + caseName: "success default policy test", + inputString: ` +--- +receivers: + httpcheck: + targets: + - endpoint: http://orb.live + method: GET + - endpoint: http://orb.community + method: GET + headers: + test-header: "test-value" + collection_interval: 30s +exporters: + otlp: + endpoint: otelconsumer:45317 + tls: + insecure: true +service: + pipelines: + metrics: + exporters: + - otlp + receivers: + - httpcheck +`, + policyId: "test-policy-id", + policyName: "test-policy", + }, + } + for _, testCase := range testCases { + t.Run(testCase.caseName, func(t *testing.T) { + logger := zap.NewNop() + exporterBuilder := getExporterBuilder(logger) + gotOtelConfig, err := exporterBuilder.GetStructFromYaml(testCase.inputString) + if err != nil { + t.Errorf("failed to merge default value with policy: %v", err) + } + expectedStruct, err := exporterBuilder.MergeDefaultValueWithPolicy(gotOtelConfig, testCase.policyId, testCase.policyName) + if err != nil { + t.Errorf("failed to merge default value with policy: %v", err) + } + if _, ok := expectedStruct.Processors["attributes/policy_data"]; !ok { + t.Error("missing required attributes/policy_data processor", err) + } + + }) + } +}