From 40be3bac1f365a46627f7626803f04a35a9f20c8 Mon Sep 17 00:00:00 2001 From: Foad Nosrati Habibi Date: Thu, 24 Oct 2024 07:42:18 +1100 Subject: [PATCH] Add resource_keys routing for the traces --- exporter/loadbalancingexporter/config.go | 8 +- .../loadbalancingexporter/trace_exporter.go | 42 ++-- .../trace_exporter_test.go | 189 ++++++++++++++++-- 3 files changed, 203 insertions(+), 36 deletions(-) diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go index d3109bc9056a..212531567e48 100644 --- a/exporter/loadbalancingexporter/config.go +++ b/exporter/loadbalancingexporter/config.go @@ -17,13 +17,15 @@ const ( svcRouting metricNameRouting resourceRouting + resourceKeysRouting ) // Config defines configuration for the exporter. type Config struct { - Protocol Protocol `mapstructure:"protocol"` - Resolver ResolverSettings `mapstructure:"resolver"` - RoutingKey string `mapstructure:"routing_key"` + Protocol Protocol `mapstructure:"protocol"` + Resolver ResolverSettings `mapstructure:"resolver"` + RoutingKey string `mapstructure:"routing_key"` + ResourceKeys []string `mapstructure:"resource_keys"` } // Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment. diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index a6f955c69975..25d394514c86 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -27,8 +27,9 @@ var _ exporter.Traces = (*traceExporterImp)(nil) type exporterTraces map[*wrappedExporter]ptrace.Traces type traceExporterImp struct { - loadBalancer *loadBalancer - routingKey routingKey + loadBalancer *loadBalancer + routingKey routingKey + routingResourceKeys []string stopped bool shutdownWg sync.WaitGroup @@ -38,8 +39,9 @@ type traceExporterImp struct { func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*traceExporterImp, error) { exporterFactory := otlpexporter.NewFactory() + eCfg := cfg.(*Config) lb, err := newLoadBalancer(params, cfg, func(ctx context.Context, endpoint string) (component.Component, error) { - oCfg := buildExporterConfig(cfg.(*Config), endpoint) + oCfg := buildExporterConfig(eCfg, endpoint) return exporterFactory.CreateTracesExporter(ctx, params, &oCfg) }) if err != nil { @@ -48,9 +50,13 @@ func newTracesExporter(params exporter.CreateSettings, cfg component.Config) (*t traceExporter := traceExporterImp{loadBalancer: lb, routingKey: traceIDRouting} - switch cfg.(*Config).RoutingKey { + switch eCfg.RoutingKey { case "service": - traceExporter.routingKey = svcRouting + traceExporter.routingKey = resourceKeysRouting + traceExporter.routingResourceKeys = []string{"service.name"} + case "resource": + traceExporter.routingKey = resourceKeysRouting + traceExporter.routingResourceKeys = eCfg.ResourceKeys case "traceID", "": default: return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey) @@ -85,7 +91,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) exporterSegregatedTraces := make(exporterTraces) endpoints := make(map[*wrappedExporter]string) for _, batch := range batches { - routingID, err := routingIdentifiersFromTraces(batch, e.routingKey) + routingID, err := e.routingIdentifiersFromTraces(batch) if err != nil { return err } @@ -132,7 +138,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) return errs } -func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]bool, error) { +func (e *traceExporterImp) routingIdentifiersFromTraces(td ptrace.Traces) (map[string]bool, error) { ids := make(map[string]bool) rs := td.ResourceSpans() if rs.Len() == 0 { @@ -149,15 +155,25 @@ func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string] return nil, errors.New("empty spans") } - if key == svcRouting { + if e.routingKey == resourceKeysRouting { + var missingResourceKey bool for i := 0; i < rs.Len(); i++ { - svc, ok := rs.At(i).Resource().Attributes().Get("service.name") - if !ok { - return nil, errors.New("unable to get service name") + var resourceKeyFound bool + rsi := rs.At(i) + for _, attrKey := range e.routingResourceKeys { + if v, ok := rsi.Resource().Attributes().Get(attrKey); ok { + ids[v.AsString()] = true + resourceKeyFound = true + break + } + } + if !resourceKeyFound { + missingResourceKey = true } - ids[svc.Str()] = true } - return ids, nil + if !missingResourceKey { + return ids, nil + } } tid := spans.At(0).TraceID() ids[string(tid[:])] = true diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index cb544d118291..2deb4b5ff3e6 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -35,27 +35,52 @@ import ( func TestNewTracesExporter(t *testing.T) { for _, tt := range []struct { - desc string - config *Config - err error + desc string + config *Config + wantRoutingKey routingKey + wantRoutingResourceKeys []string + err error }{ { "simple", simpleConfig(), + traceIDRouting, + nil, + nil, + }, + { + "service", + serviceBasedRoutingConfig(), + resourceKeysRouting, + []string{conventions.AttributeServiceName}, + nil, + }, + { + "resource_keys", + resourceKeysBasedRoutingConfig(), + resourceKeysRouting, + []string{"resource.key_1", "resource.key_2"}, nil, }, { "empty", &Config{}, + 0, + nil, errNoResolver, }, } { t.Run(tt.desc, func(t *testing.T) { // test - _, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config) + te, err := newTracesExporter(exportertest.NewNopCreateSettings(), tt.config) // verify require.Equal(t, tt.err, err) + if err != nil { + return + } + require.Equal(t, tt.wantRoutingKey, te.routingKey) + require.Equal(t, tt.wantRoutingResourceKeys, te.routingResourceKeys) }) } } @@ -219,7 +244,8 @@ func TestConsumeTracesServiceBased(t *testing.T) { p, err := newTracesExporter(exportertest.NewNopCreateSettings(), serviceBasedRoutingConfig()) require.NotNil(t, p) require.NoError(t, err) - assert.Equal(t, p.routingKey, svcRouting) + assert.Equal(t, p.routingKey, resourceKeysRouting) + assert.Equal(t, p.routingResourceKeys, []string{"service.name"}) // pre-load an exporter here, so that we don't use the actual OTLP exporter lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) @@ -245,29 +271,113 @@ func TestConsumeTracesServiceBased(t *testing.T) { assert.Nil(t, res) } +func TestConsumeTracesResourceKeysBased(t *testing.T) { + componentFactory := func(_ context.Context, _ string) (component.Component, error) { + return newNopMockTracesExporter(), nil + } + lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), resourceKeysBasedRoutingConfig(), componentFactory) + require.NotNil(t, lb) + require.NoError(t, err) + + p, err := newTracesExporter(exportertest.NewNopCreateSettings(), resourceKeysBasedRoutingConfig()) + require.NotNil(t, p) + require.NoError(t, err) + assert.Equal(t, p.routingKey, resourceKeysRouting) + assert.Equal(t, p.routingResourceKeys, []string{"resource.key_1", "resource.key_2"}) + + // pre-load an exporter here, so that we don't use the actual OTLP exporter + lb.addMissingExporters(context.Background(), []string{"endpoint-1"}) + lb.addMissingExporters(context.Background(), []string{"endpoint-2"}) + lb.res = &mockResolver{ + triggerCallbacks: true, + onResolve: func(_ context.Context) ([]string, error) { + return []string{"endpoint-1", "endpoint-2"}, nil + }, + } + p.loadBalancer = lb + + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + require.NoError(t, p.Shutdown(context.Background())) + }() + + // test + res := p.ConsumeTraces(context.Background(), simpleTracesWithResourceKeys()) + + // verify + assert.Nil(t, res) +} + func TestServiceBasedRoutingForSameTraceId(t *testing.T) { b := pcommon.TraceID([16]byte{1, 2, 3, 4}) for _, tt := range []struct { - desc string - batch ptrace.Traces - routingKey routingKey - res map[string]bool + te *traceExporterImp + desc string + batch ptrace.Traces + res map[string]bool }{ { + &traceExporterImp{ + routingKey: resourceKeysRouting, + routingResourceKeys: []string{"service.name"}, + }, "same trace id and different services - service based routing", twoServicesWithSameTraceID(), - svcRouting, map[string]bool{"ad-service-1": true, "get-recommendations-7": true}, }, { + &traceExporterImp{ + routingKey: traceIDRouting, + }, "same trace id and different services - trace id routing", twoServicesWithSameTraceID(), - traceIDRouting, map[string]bool{string(b[:]): true}, }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey) + res, err := tt.te.routingIdentifiersFromTraces(tt.batch) + assert.Equal(t, err, nil) + assert.Equal(t, res, tt.res) + }) + } +} + +func TestResourceKeysBasedRoutingIdentifiers(t *testing.T) { + b := pcommon.TraceID([16]byte{1, 2, 3, 4}) + for _, tt := range []struct { + te *traceExporterImp + desc string + batch ptrace.Traces + res map[string]bool + }{ + { + &traceExporterImp{ + routingKey: resourceKeysRouting, + routingResourceKeys: []string{"resource.key_1", "resource.key_2"}, + }, + "two resource_keys values", + simpleTracesWithResourceKeys(), + map[string]bool{ + "val-1": true, + "val-2": true, + }, + }, + { + &traceExporterImp{ + routingKey: resourceKeysRouting, + routingResourceKeys: []string{"resource.key_1"}, + }, + "single resource_keys value with trace ID as default", + simpleTracesWithResourceKeys(), + map[string]bool{ + "val-1": true, + string(b[:]): true, + }, + }, + } { + t.Run(tt.desc, func(t *testing.T) { + res, err := tt.te.routingIdentifiersFromTraces(tt.batch) assert.Equal(t, err, nil) assert.Equal(t, res, tt.res) }) @@ -405,40 +515,46 @@ func TestBatchWithTwoTraces(t *testing.T) { func TestNoTracesInBatch(t *testing.T) { for _, tt := range []struct { - desc string - batch ptrace.Traces - routingKey routingKey - err error + te *traceExporterImp + desc string + batch ptrace.Traces + err error }{ { + &traceExporterImp{ + routingKey: svcRouting, + }, "no resource spans", ptrace.NewTraces(), - traceIDRouting, errors.New("empty resource spans"), }, { + &traceExporterImp{ + routingKey: traceIDRouting, + }, "no instrumentation library spans", func() ptrace.Traces { batch := ptrace.NewTraces() batch.ResourceSpans().AppendEmpty() return batch }(), - traceIDRouting, errors.New("empty scope spans"), }, { + &traceExporterImp{ + routingKey: svcRouting, + }, "no spans", func() ptrace.Traces { batch := ptrace.NewTraces() batch.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty() return batch }(), - svcRouting, errors.New("empty spans"), }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey) + res, err := tt.te.routingIdentifiersFromTraces(tt.batch) assert.Equal(t, err, tt.err) assert.Equal(t, res, map[string]bool(nil)) }) @@ -684,6 +800,29 @@ func simpleTraces() ptrace.Traces { return traces } +func simpleTracesWithResourceKeys() ptrace.Traces { + traces := ptrace.NewTraces() + traces.ResourceSpans().EnsureCapacity(1) + + rSpans := traces.ResourceSpans().AppendEmpty() + rAttrs := rSpans.Resource().Attributes() + rAttrs.PutStr("resource.key_1", "val-1") + rSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4}) + + rSpans = traces.ResourceSpans().AppendEmpty() + rAttrs = rSpans.Resource().Attributes() + rAttrs.PutStr("resource.key_2", "val-2") + rSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4}) + + rSpans = traces.ResourceSpans().AppendEmpty() + rAttrs = rSpans.Resource().Attributes() + rAttrs.PutStr("resource.key_1", "val-1") + rAttrs.PutStr("resource.key_2", "val-2") + rSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID([16]byte{1, 2, 3, 4}) + + return traces +} + func simpleTracesWithServiceName() ptrace.Traces { traces := ptrace.NewTraces() traces.ResourceSpans().EnsureCapacity(1) @@ -736,6 +875,16 @@ func serviceBasedRoutingConfig() *Config { } } +func resourceKeysBasedRoutingConfig() *Config { + return &Config{ + Resolver: ResolverSettings{ + Static: &StaticResolver{Hostnames: []string{"endpoint-1", "endpoint-2"}}, + }, + RoutingKey: "resource_keys", + ResourceKeys: []string{"resource.key_1", "resource.key_2"}, + } +} + type mockTracesExporter struct { component.Component ConsumeTracesFn func(ctx context.Context, td ptrace.Traces) error