Skip to content

Commit

Permalink
feat(agent): add struct parse to otel configuration.
Browse files Browse the repository at this point in the history
  • Loading branch information
Luiz Pegoraro committed Nov 1, 2023
1 parent eb1c5b5 commit a72bbd1
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 3 deletions.
110 changes: 107 additions & 3 deletions agent/backend/otel/exporter_builder.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,117 @@
package otel

// TODO Create a struct to hold the exporter and processor as default to inject the policy id and name as attribute with an attribute processor
type defaultExporters struct {
import (
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)

type ExporterBuilder interface {
GetStructFromYaml(yamlString string) (openTelemetryConfig, error)
MergeDefaultValueWithPolicy(config openTelemetryConfig, policyName string) (openTelemetryConfig, error)
}

type openTelemetryConfig struct {
Receivers map[string]interface{} `yaml:"receivers"`
Processors map[string]interface{} `yaml:"processors"`
Extensions map[string]interface{} `yaml:"extensions"`
Exporters *exporters `yaml:"exporters"`
Service *service `yaml:"service"`
}

type exporters struct {
Otlp *defaultOtlpExporter `yaml:"otlp"`
}

type defaultOtlpExporter struct {
Endpoint string `yaml:"endpoint"`
Tls *tls `yaml:"tls"`
}

type tls struct {
Insecure bool `yaml:"insecure"`
}

type service struct {
Pipelines *pipelines `yaml:"pipelines"`
}

type pipelines struct {
Metrics *pipeline `yaml:"metrics"`
Traces *pipeline `yaml:"traces"`
Logs *pipeline `yaml:"logs"`
}

func (o *openTelemetryBackend) buildDefaultExporterAndProcessor() {
type pipeline struct {
Exporters []string `yaml:"exporters"`
Receivers []string `yaml:"receivers"`
Processors []string `yaml:"processors"`
}

func getExporterBuilder(logger *zap.Logger) *exporterBuilder {
return &exporterBuilder{logger: logger}
}

type exporterBuilder struct {
logger *zap.Logger
}

func (e *exporterBuilder) GetStructFromYaml(yamlString string) (openTelemetryConfig, error) {
var config openTelemetryConfig
err := yaml.Unmarshal([]byte(yamlString), &config)
if err != nil {
e.logger.Error("failed to unmarshal yaml string", zap.Error(err))
return openTelemetryConfig{}, err
}
return config, nil
}

func (e *exporterBuilder) MergeDefaultValueWithPolicy(config openTelemetryConfig, policyId string, policyName string) (openTelemetryConfig, error) {
defaultOtlpExporter := defaultOtlpExporter{
Endpoint: "localhost:4317",
Tls: &tls{
Insecure: true,
},
}
// Override any openTelemetry exporter that may come, to connect to agent's otlp receiver
config.Exporters = &exporters{&defaultOtlpExporter}
config.Processors["attributes/policy_data"] = map[string]interface{}{
"actions": []struct {
Key string `yaml:"key"`
Value string `yaml:"value"`
Action string `yaml:"action"`
}{
{Key: "policy_id", Value: policyId, Action: "insert"},
{Key: "policy_name", Value: policyName, Action: "insert"},
},
}
// Override metrics exporter
config.Service.Pipelines.Metrics.Exporters = []string{"otlp"}
config.Service.Pipelines.Traces.Exporters = []string{"otlp"}
config.Service.Pipelines.Logs.Exporters = []string{"otlp"}

// Append attributes processor
config.Service.Pipelines.Metrics.Processors = append(config.Service.Pipelines.Metrics.Processors, "attributes/policy_data")
config.Service.Pipelines.Traces.Processors = append(config.Service.Pipelines.Traces.Processors, "attributes/policy_data")
config.Service.Pipelines.Logs.Processors = append(config.Service.Pipelines.Logs.Processors, "attributes/policy_data")

return config, nil
}

func (o *openTelemetryBackend) buildDefaultExporterAndProcessor(policyYaml string, policyId string, policyName string) (openTelemetryConfig, error) {
defaultPolicyYaml, err := yaml.Marshal(policyYaml)
if err != nil {
o.logger.Warn("yaml policy marshal failure", zap.String("policy_id", policyId))
return openTelemetryConfig{}, err
}
defaultPolicyString := string(defaultPolicyYaml)
builder := getExporterBuilder(o.logger)
defaultPolicyStruct, err := builder.GetStructFromYaml(defaultPolicyString)
if err != nil {
return openTelemetryConfig{}, err
}
defaultPolicyStruct, err = builder.MergeDefaultValueWithPolicy(defaultPolicyStruct, policyId, policyName)
if err != nil {
return openTelemetryConfig{}, err
}
return defaultPolicyStruct, nil
}
67 changes: 67 additions & 0 deletions agent/backend/otel/exporter_builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package otel

import (
"go.uber.org/zap"
"testing"
)

func TestBuildDefaultPolicy(t *testing.T) {
testCases := []struct {
caseName string
inputString string
policyId string
policyName string
expectedStruct openTelemetryConfig
processedString string
wantErr error
}{
{
caseName: "success default policy test",
inputString: `
---
receivers:
httpcheck:
targets:
- endpoint: http://orb.live
method: GET
- endpoint: http://orb.community
method: GET
headers:
test-header: "test-value"
collection_interval: 30s
exporters:
otlp:
endpoint: otelconsumer:45317
tls:
insecure: true
service:
pipelines:
metrics:
exporters:
- otlp
receivers:
- httpcheck
`,
policyId: "test-policy-id",
policyName: "test-policy",
},
}
for _, testCase := range testCases {
t.Run(testCase.caseName, func(t *testing.T) {
logger := zap.NewNop()
exporterBuilder := getExporterBuilder(logger)
gotOtelConfig, err := exporterBuilder.GetStructFromYaml(testCase.inputString)
if err != nil {
t.Errorf("failed to merge default value with policy: %v", err)
}
expectedStruct, err := exporterBuilder.MergeDefaultValueWithPolicy(gotOtelConfig, testCase.policyId, testCase.policyName)
if err != nil {
t.Errorf("failed to merge default value with policy: %v", err)
}
if _, ok := expectedStruct.Processors["attributes/policy_data"]; !ok {
t.Error("missing required attributes/policy_data processor", err)
}

})
}
}

0 comments on commit a72bbd1

Please sign in to comment.