From a03a2d320ead76b635f5d1f88c33d5e87a68c190 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Tue, 12 Nov 2024 07:18:30 -0500 Subject: [PATCH] [connector/routing] Add ability to route by span context (#36277) --- .chloggen/routing-by-traces.yaml | 27 +++ connector/routingconnector/README.md | 5 +- connector/routingconnector/config.go | 2 +- connector/routingconnector/config_test.go | 16 ++ .../internal/ptraceutil/traces.go | 49 ++++- .../internal/ptraceutil/traces_test.go | 144 ++++++++++++ .../internal/ptraceutiltest/traces.go | 54 +++++ .../internal/ptraceutiltest/traces_test.go | 50 +++++ connector/routingconnector/router.go | 26 ++- connector/routingconnector/traces.go | 10 + connector/routingconnector/traces_test.go | 206 +++++++++++++++++- 11 files changed, 575 insertions(+), 14 deletions(-) create mode 100644 .chloggen/routing-by-traces.yaml diff --git a/.chloggen/routing-by-traces.yaml b/.chloggen/routing-by-traces.yaml new file mode 100644 index 000000000000..3414a5e8064a --- /dev/null +++ b/.chloggen/routing-by-traces.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: routingconnector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add ability to route by span context + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36276] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/connector/routingconnector/README.md b/connector/routingconnector/README.md index 02ad40317832..d7a3a7731419 100644 --- a/connector/routingconnector/README.md +++ b/connector/routingconnector/README.md @@ -33,7 +33,7 @@ If you are not already familiar with connectors, you may find it helpful to firs The following settings are available: - `table (required)`: the routing table for this connector. -- `table.context (optional, default: resource)`: the [OTTL Context] in which the statement will be evaluated. Currently, only `resource`, `metric`, `log`, and `request` are supported. +- `table.context (optional, default: resource)`: the [OTTL Context] in which the statement will be evaluated. Currently, only `resource`, `span`, `metric`, `log`, and `request` are supported. - `table.statement`: the routing condition provided as the [OTTL] statement. Required if `table.condition` is not provided. May not be used for `request` context. - `table.condition`: the routing condition provided as the [OTTL] condition. Required if `table.statement` is not provided. Required for `request` context. - `table.pipelines (required)`: the list of pipelines to use when the routing condition is met. @@ -43,7 +43,7 @@ The following settings are available: ### Limitations -- The `match_once` setting is only supported when using the `resource` context. If any routes use `metric`, `log` or `request` context, `match_once` must be set to `true`. +- The `match_once` setting is only supported when using the `resource` context. If any routes use `span`, `metric`, `log` or `request` context, `match_once` must be set to `true`. - The `request` context requires use of the `condition` setting, and relies on a very limited grammar. Conditions must be in the form of `request["key"] == "value"` or `request["key"] != "value"`. (In the future, this grammar may be expanded to support more complex conditions.) ### Supported [OTTL] functions @@ -287,7 +287,6 @@ service: ## Differences between the Routing Connector and Routing Processor -- Routing on context values is only supported for logs at this time. - The connector routes to pipelines, not exporters as the processor does. [Connectors README]:https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md diff --git a/connector/routingconnector/config.go b/connector/routingconnector/config.go index fb2f838474c7..33a0c702bca9 100644 --- a/connector/routingconnector/config.go +++ b/connector/routingconnector/config.go @@ -77,7 +77,7 @@ func (c *Config) Validate() error { return err } fallthrough - case "metric", "log": // ok + case "span", "metric", "log": // ok if !c.MatchOnce { return fmt.Errorf(`%q context is not supported with "match_once: false"`, item.Context) } diff --git a/connector/routingconnector/config_test.go b/connector/routingconnector/config_test.go index b79eb4ee1bf3..4a0ef0d0d5a4 100644 --- a/connector/routingconnector/config_test.go +++ b/connector/routingconnector/config_test.go @@ -218,6 +218,22 @@ func TestValidateConfig(t *testing.T) { }, error: "invalid context: invalid", }, + { + name: "span context with match_once false", + config: &Config{ + MatchOnce: false, + Table: []RoutingTableItem{ + { + Context: "span", + Statement: `route() where attributes["attr"] == "acme"`, + Pipelines: []pipeline.ID{ + pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"), + }, + }, + }, + }, + error: `"span" context is not supported with "match_once: false"`, + }, { name: "metric context with match_once false", config: &Config{ diff --git a/connector/routingconnector/internal/ptraceutil/traces.go b/connector/routingconnector/internal/ptraceutil/traces.go index 4f925fb98fcb..e47bb7529dce 100644 --- a/connector/routingconnector/internal/ptraceutil/traces.go +++ b/connector/routingconnector/internal/ptraceutil/traces.go @@ -8,11 +8,54 @@ import "go.opentelemetry.io/collector/pdata/ptrace" // MoveResourcesIf calls f sequentially for each ResourceSpans present in the first ptrace.Traces. // If f returns true, the element is removed from the first ptrace.Traces and added to the second ptrace.Traces. func MoveResourcesIf(from, to ptrace.Traces, f func(ptrace.ResourceSpans) bool) { - from.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool { - if !f(rs) { + from.ResourceSpans().RemoveIf(func(resoruceSpans ptrace.ResourceSpans) bool { + if !f(resoruceSpans) { return false } - rs.CopyTo(to.ResourceSpans().AppendEmpty()) + resoruceSpans.CopyTo(to.ResourceSpans().AppendEmpty()) return true }) } + +// MoveSpansWithContextIf calls f sequentially for each Span present in the first ptrace.Traces. +// If f returns true, the element is removed from the first ptrace.Traces and added to the second ptrace.Traces. +// Notably, the Resource and Scope associated with the Span are created in the second ptrace.Traces only once. +// Resources or Scopes are removed from the original if they become empty. All ordering is preserved. +func MoveSpansWithContextIf(from, to ptrace.Traces, f func(ptrace.ResourceSpans, ptrace.ScopeSpans, ptrace.Span) bool) { + resourceSpansSlice := from.ResourceSpans() + for i := 0; i < resourceSpansSlice.Len(); i++ { + resourceSpans := resourceSpansSlice.At(i) + scopeSpanSlice := resourceSpans.ScopeSpans() + var resourceSpansCopy *ptrace.ResourceSpans + for j := 0; j < scopeSpanSlice.Len(); j++ { + scopeSpans := scopeSpanSlice.At(j) + spanSlice := scopeSpans.Spans() + var scopeSpansCopy *ptrace.ScopeSpans + spanSlice.RemoveIf(func(span ptrace.Span) bool { + if !f(resourceSpans, scopeSpans, span) { + return false + } + if resourceSpansCopy == nil { + rmc := to.ResourceSpans().AppendEmpty() + resourceSpansCopy = &rmc + resourceSpans.Resource().CopyTo(resourceSpansCopy.Resource()) + resourceSpansCopy.SetSchemaUrl(resourceSpans.SchemaUrl()) + } + if scopeSpansCopy == nil { + smc := resourceSpansCopy.ScopeSpans().AppendEmpty() + scopeSpansCopy = &smc + scopeSpans.Scope().CopyTo(scopeSpansCopy.Scope()) + scopeSpansCopy.SetSchemaUrl(scopeSpans.SchemaUrl()) + } + span.CopyTo(scopeSpansCopy.Spans().AppendEmpty()) + return true + }) + } + scopeSpanSlice.RemoveIf(func(sm ptrace.ScopeSpans) bool { + return sm.Spans().Len() == 0 + }) + } + resourceSpansSlice.RemoveIf(func(resourceSpans ptrace.ResourceSpans) bool { + return resourceSpans.ScopeSpans().Len() == 0 + }) +} diff --git a/connector/routingconnector/internal/ptraceutil/traces_test.go b/connector/routingconnector/internal/ptraceutil/traces_test.go index 7a47633e2e8e..40d05c5bec8e 100644 --- a/connector/routingconnector/internal/ptraceutil/traces_test.go +++ b/connector/routingconnector/internal/ptraceutil/traces_test.go @@ -80,3 +80,147 @@ func TestMoveResourcesIf(t *testing.T) { }) } } + +func TestMoveSpansWithContextIf(t *testing.T) { + testCases := []struct { + name string + moveIf func(ptrace.ResourceSpans, ptrace.ScopeSpans, ptrace.Span) bool + from ptrace.Traces + to ptrace.Traces + expectFrom ptrace.Traces + expectTo ptrace.Traces + }{ + { + name: "move_none", + moveIf: func(_ ptrace.ResourceSpans, _ ptrace.ScopeSpans, _ ptrace.Span) bool { + return false + }, + from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + to: ptrace.NewTraces(), + expectFrom: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectTo: ptrace.NewTraces(), + }, + { + name: "move_all", + moveIf: func(_ ptrace.ResourceSpans, _ ptrace.ScopeSpans, _ ptrace.Span) bool { + return true + }, + from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + to: ptrace.NewTraces(), + expectFrom: ptrace.NewTraces(), + expectTo: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + }, + { + name: "move_all_from_one_resource", + moveIf: func(rl ptrace.ResourceSpans, _ ptrace.ScopeSpans, _ ptrace.Span) bool { + rname, ok := rl.Resource().Attributes().Get("resourceName") + return ok && rname.AsString() == "resourceB" + }, + from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + to: ptrace.NewTraces(), + expectFrom: ptraceutiltest.NewTraces("A", "CD", "EF", "GH"), + expectTo: ptraceutiltest.NewTraces("B", "CD", "EF", "GH"), + }, + { + name: "move_all_from_one_scope", + moveIf: func(rl ptrace.ResourceSpans, sl ptrace.ScopeSpans, _ ptrace.Span) bool { + rname, ok := rl.Resource().Attributes().Get("resourceName") + return ok && rname.AsString() == "resourceB" && sl.Scope().Name() == "scopeC" + }, + from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + to: ptrace.NewTraces(), + expectFrom: ptraceutiltest.NewTracesFromOpts( + ptraceutiltest.WithResource('A', + ptraceutiltest.WithScope('C', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")), + ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")), + ), + ptraceutiltest.WithResource('B', + ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")), + ), + ), + expectTo: ptraceutiltest.NewTraces("B", "C", "EF", "GH"), + }, + { + name: "move_all_from_one_scope_in_each_resource", + moveIf: func(_ ptrace.ResourceSpans, sl ptrace.ScopeSpans, _ ptrace.Span) bool { + return sl.Scope().Name() == "scopeD" + }, + from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + to: ptrace.NewTraces(), + expectFrom: ptraceutiltest.NewTraces("AB", "C", "EF", "GH"), + expectTo: ptraceutiltest.NewTraces("AB", "D", "EF", "GH"), + }, + { + name: "move_one", + moveIf: func(rl ptrace.ResourceSpans, sl ptrace.ScopeSpans, m ptrace.Span) bool { + rname, ok := rl.Resource().Attributes().Get("resourceName") + return ok && rname.AsString() == "resourceA" && sl.Scope().Name() == "scopeD" && m.Name() == "spanF" + }, + from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + to: ptrace.NewTraces(), + expectFrom: ptraceutiltest.NewTracesFromOpts( + ptraceutiltest.WithResource('A', + ptraceutiltest.WithScope('C', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")), + ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH")), + ), + ptraceutiltest.WithResource('B', + ptraceutiltest.WithScope('C', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")), + ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")), + ), + ), + expectTo: ptraceutiltest.NewTraces("A", "D", "F", "GH"), + }, + { + name: "move_one_from_each_scope", + moveIf: func(_ ptrace.ResourceSpans, _ ptrace.ScopeSpans, m ptrace.Span) bool { + return m.Name() == "spanE" + }, + from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + to: ptrace.NewTraces(), + expectFrom: ptraceutiltest.NewTraces("AB", "CD", "F", "GH"), + expectTo: ptraceutiltest.NewTraces("AB", "CD", "E", "GH"), + }, + { + name: "move_one_from_each_scope_in_one_resource", + moveIf: func(rl ptrace.ResourceSpans, _ ptrace.ScopeSpans, m ptrace.Span) bool { + rname, ok := rl.Resource().Attributes().Get("resourceName") + return ok && rname.AsString() == "resourceB" && m.Name() == "spanE" + }, + from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + to: ptrace.NewTraces(), + expectFrom: ptraceutiltest.NewTracesFromOpts( + ptraceutiltest.WithResource('A', + ptraceutiltest.WithScope('C', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")), + ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")), + ), + ptraceutiltest.WithResource('B', + ptraceutiltest.WithScope('C', ptraceutiltest.WithSpan('F', "GH")), + ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('F', "GH")), + ), + ), + expectTo: ptraceutiltest.NewTraces("B", "CD", "E", "GH"), + }, + { + name: "move_some_to_preexisting", + moveIf: func(_ ptrace.ResourceSpans, sl ptrace.ScopeSpans, _ ptrace.Span) bool { + return sl.Scope().Name() == "scopeD" + }, + from: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + to: ptraceutiltest.NewTraces("1", "2", "3", "4"), + expectFrom: ptraceutiltest.NewTraces("AB", "C", "EF", "GH"), + expectTo: ptraceutiltest.NewTracesFromOpts( + ptraceutiltest.WithResource('1', ptraceutiltest.WithScope('2', ptraceutiltest.WithSpan('3', "4"))), + ptraceutiltest.WithResource('A', ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH"))), + ptraceutiltest.WithResource('B', ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH"))), + ), + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + ptraceutil.MoveSpansWithContextIf(tt.from, tt.to, tt.moveIf) + assert.NoError(t, ptracetest.CompareTraces(tt.expectFrom, tt.from), "from not modified as expected") + assert.NoError(t, ptracetest.CompareTraces(tt.expectTo, tt.to), "to not as expected") + }) + } +} diff --git a/connector/routingconnector/internal/ptraceutiltest/traces.go b/connector/routingconnector/internal/ptraceutiltest/traces.go index 863eeb888b58..4317a113e34b 100644 --- a/connector/routingconnector/internal/ptraceutiltest/traces.go +++ b/connector/routingconnector/internal/ptraceutiltest/traces.go @@ -42,3 +42,57 @@ func NewTraces(resourceIDs, scopeIDs, spanIDs, spanEventIDs string) ptrace.Trace } return td } + +type Resource struct { + id byte + scopes []Scope +} + +type Scope struct { + id byte + spans []Span +} + +type Span struct { + id byte + spanEvents string +} + +func WithResource(id byte, scopes ...Scope) Resource { + r := Resource{id: id} + r.scopes = append(r.scopes, scopes...) + return r +} + +func WithScope(id byte, spans ...Span) Scope { + s := Scope{id: id} + s.spans = append(s.spans, spans...) + return s +} + +func WithSpan(id byte, spanEvents string) Span { + return Span{id: id, spanEvents: spanEvents} +} + +// NewTracesFromOpts creates a ptrace.Traces with the specified resources, scopes, metrics, +// and data points. The general idea is the same as NewMetrics, but this function allows for +// more flexibility in creating non-uniform structures. +func NewTracesFromOpts(resources ...Resource) ptrace.Traces { + td := ptrace.NewTraces() + for _, resource := range resources { + r := td.ResourceSpans().AppendEmpty() + r.Resource().Attributes().PutStr("resourceName", "resource"+string(resource.id)) + for _, scope := range resource.scopes { + ss := r.ScopeSpans().AppendEmpty() + ss.Scope().SetName("scope" + string(scope.id)) + for _, span := range scope.spans { + s := ss.Spans().AppendEmpty() + s.SetName("span" + string(span.id)) + for i := 0; i < len(span.spanEvents); i++ { + s.Events().AppendEmpty().Attributes().PutStr("spanEventName", "spanEvent"+string(span.spanEvents[i])) + } + } + } + } + return td +} diff --git a/connector/routingconnector/internal/ptraceutiltest/traces_test.go b/connector/routingconnector/internal/ptraceutiltest/traces_test.go index 0de27955b630..41d553444d38 100644 --- a/connector/routingconnector/internal/ptraceutiltest/traces_test.go +++ b/connector/routingconnector/internal/ptraceutiltest/traces_test.go @@ -18,6 +18,7 @@ func TestNewTraces(t *testing.T) { t.Run("empty", func(t *testing.T) { expected := ptrace.NewTraces() assert.NoError(t, ptracetest.CompareTraces(expected, ptraceutiltest.NewTraces("", "", "", ""))) + assert.NoError(t, ptracetest.CompareTraces(expected, ptraceutiltest.NewTracesFromOpts())) }) t.Run("simple", func(t *testing.T) { @@ -33,7 +34,15 @@ func TestNewTraces(t *testing.T) { se.Attributes().PutStr("spanEventName", "spanEventD") // resourceA.scopeB.spanC.spanEventD return td }() + fromOpts := ptraceutiltest.NewTracesFromOpts( + ptraceutiltest.WithResource('A', + ptraceutiltest.WithScope('B', + ptraceutiltest.WithSpan('C', "D"), + ), + ), + ) assert.NoError(t, ptracetest.CompareTraces(expected, ptraceutiltest.NewTraces("A", "B", "C", "D"))) + assert.NoError(t, ptracetest.CompareTraces(expected, fromOpts)) }) t.Run("two_resources", func(t *testing.T) { @@ -57,7 +66,20 @@ func TestNewTraces(t *testing.T) { se.Attributes().PutStr("spanEventName", "spanEventE") // resourceB.scopeC.spanD.spanEventE return td }() + fromOpts := ptraceutiltest.NewTracesFromOpts( + ptraceutiltest.WithResource('A', + ptraceutiltest.WithScope('C', + ptraceutiltest.WithSpan('D', "E"), + ), + ), + ptraceutiltest.WithResource('B', + ptraceutiltest.WithScope('C', + ptraceutiltest.WithSpan('D', "E"), + ), + ), + ) assert.NoError(t, ptracetest.CompareTraces(expected, ptraceutiltest.NewTraces("AB", "C", "D", "E"))) + assert.NoError(t, ptracetest.CompareTraces(expected, fromOpts)) }) t.Run("two_scopes", func(t *testing.T) { @@ -79,7 +101,18 @@ func TestNewTraces(t *testing.T) { se.Attributes().PutStr("spanEventName", "spanEventE") // resourceA.scopeC.spanD.spanEventE return td }() + fromOpts := ptraceutiltest.NewTracesFromOpts( + ptraceutiltest.WithResource('A', + ptraceutiltest.WithScope('B', + ptraceutiltest.WithSpan('D', "E"), + ), + ptraceutiltest.WithScope('C', + ptraceutiltest.WithSpan('D', "E"), + ), + ), + ) assert.NoError(t, ptracetest.CompareTraces(expected, ptraceutiltest.NewTraces("A", "BC", "D", "E"))) + assert.NoError(t, ptracetest.CompareTraces(expected, fromOpts)) }) t.Run("two_spans", func(t *testing.T) { @@ -99,7 +132,16 @@ func TestNewTraces(t *testing.T) { se.Attributes().PutStr("spanEventName", "spanEventE") // resourceA.scopeB.spanD.spanEventE return td }() + fromOpts := ptraceutiltest.NewTracesFromOpts( + ptraceutiltest.WithResource('A', + ptraceutiltest.WithScope('B', + ptraceutiltest.WithSpan('C', "E"), + ptraceutiltest.WithSpan('D', "E"), + ), + ), + ) assert.NoError(t, ptracetest.CompareTraces(expected, ptraceutiltest.NewTraces("A", "B", "CD", "E"))) + assert.NoError(t, ptracetest.CompareTraces(expected, fromOpts)) }) t.Run("two_spanevents", func(t *testing.T) { @@ -117,6 +159,14 @@ func TestNewTraces(t *testing.T) { se.Attributes().PutStr("spanEventName", "spanEventE") // resourceA.scopeB.spanC.spanEventE return td }() + fromOpts := ptraceutiltest.NewTracesFromOpts( + ptraceutiltest.WithResource('A', + ptraceutiltest.WithScope('B', + ptraceutiltest.WithSpan('C', "DE"), + ), + ), + ) assert.NoError(t, ptracetest.CompareTraces(expected, ptraceutiltest.NewTraces("A", "B", "C", "DE"))) + assert.NoError(t, ptracetest.CompareTraces(expected, fromOpts)) }) } diff --git a/connector/routingconnector/router.go b/connector/routingconnector/router.go index 01dd13143261..98f05bc92287 100644 --- a/connector/routingconnector/router.go +++ b/connector/routingconnector/router.go @@ -17,6 +17,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" ) var errPipelineNotFound = errors.New("pipeline not found") @@ -32,6 +33,7 @@ type consumerProvider[C any] func(...pipeline.ID) (C, error) type router[C any] struct { logger *zap.Logger resourceParser ottl.Parser[ottlresource.TransformContext] + spanParser ottl.Parser[ottlspan.TransformContext] metricParser ottl.Parser[ottlmetric.TransformContext] logParser ottl.Parser[ottllog.TransformContext] @@ -74,16 +76,19 @@ type routingItem[C any] struct { statementContext string requestCondition *requestCondition resourceStatement *ottl.Statement[ottlresource.TransformContext] + spanStatement *ottl.Statement[ottlspan.TransformContext] metricStatement *ottl.Statement[ottlmetric.TransformContext] logStatement *ottl.Statement[ottllog.TransformContext] } func (r *router[C]) buildParsers(table []RoutingTableItem, settings component.TelemetrySettings) error { - var buildResource, buildMetric, buildLog bool + var buildResource, buildSpan, buildMetric, buildLog bool for _, item := range table { switch item.Context { case "", "resource": buildResource = true + case "span": + buildSpan = true case "metric": buildMetric = true case "log": @@ -103,6 +108,17 @@ func (r *router[C]) buildParsers(table []RoutingTableItem, settings component.Te errs = errors.Join(errs, err) } } + if buildSpan { + parser, err := ottlspan.NewParser( + common.Functions[ottlspan.TransformContext](), + settings, + ) + if err == nil { + r.spanParser = parser + } else { + errs = errors.Join(errs, err) + } + } if buildMetric { parser, err := ottlmetric.NewParser( common.Functions[ottlmetric.TransformContext](), @@ -110,8 +126,6 @@ func (r *router[C]) buildParsers(table []RoutingTableItem, settings component.Te ) if err == nil { r.metricParser = parser - } else { - errs = errors.Join(errs, err) } } if buildLog { @@ -190,6 +204,12 @@ func (r *router[C]) registerRouteConsumers() (err error) { return err } route.resourceStatement = statement + case "span": + statement, err := r.spanParser.ParseStatement(item.Statement) + if err != nil { + return err + } + route.spanStatement = statement case "metric": statement, err := r.metricParser.ParseStatement(item.Statement) if err != nil { diff --git a/connector/routingconnector/traces.go b/connector/routingconnector/traces.go index a82ee85a9973..7df2effd74f0 100644 --- a/connector/routingconnector/traces.go +++ b/connector/routingconnector/traces.go @@ -16,6 +16,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/ptraceutil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" ) type tracesConnector struct { @@ -88,6 +89,15 @@ func (c *tracesConnector) switchTraces(ctx context.Context, td ptrace.Traces) er return isMatch }, ) + case "span": + ptraceutil.MoveSpansWithContextIf(td, matchedSpans, + func(rs ptrace.ResourceSpans, ss ptrace.ScopeSpans, s ptrace.Span) bool { + mtx := ottlspan.NewTransformContext(s, ss.Scope(), rs.Resource(), ss, rs) + _, isMatch, err := route.spanStatement.Execute(ctx, mtx) + errs = errors.Join(errs, err) + return isMatch + }, + ) } if errs != nil { if c.config.ErrorMode == ottl.PropagateError { diff --git a/connector/routingconnector/traces_test.go b/connector/routingconnector/traces_test.go index 291e8fd230af..78f7e46f414a 100644 --- a/connector/routingconnector/traces_test.go +++ b/connector/routingconnector/traces_test.go @@ -429,12 +429,21 @@ func TestTracesConnectorDetailed(t *testing.T) { isAcme := `request["X-Tenant"] == "acme"` - isAnyResource := `attributes["resourceName"] != nil` isResourceA := `attributes["resourceName"] == "resourceA"` isResourceB := `attributes["resourceName"] == "resourceB"` isResourceX := `attributes["resourceName"] == "resourceX"` isResourceY := `attributes["resourceName"] == "resourceY"` + isSpanE := `name == "spanE"` + isSpanF := `name == "spanF"` + isSpanX := `name == "spanX"` + isSpanY := `name == "spanY"` + + isScopeCFromLowerContext := `instrumentation_scope.name == "scopeC"` + isScopeDFromLowerContext := `instrumentation_scope.name == "scopeD"` + + isResourceBFromLowerContext := `resource.attributes["resourceName"] == "resourceB"` + testCases := []struct { name string cfg *Config @@ -537,7 +546,7 @@ func TestTracesConnectorDetailed(t *testing.T) { { name: "resource/all_match_first_only", cfg: testConfig( - withRoute("resource", isAnyResource, idSink0), + withRoute("resource", "true", idSink0), withRoute("resource", isResourceY, idSink1), withDefault(idSinkD), ), @@ -550,7 +559,7 @@ func TestTracesConnectorDetailed(t *testing.T) { name: "resource/all_match_last_only", cfg: testConfig( withRoute("resource", isResourceX, idSink0), - withRoute("resource", isAnyResource, idSink1), + withRoute("resource", "true", idSink1), withDefault(idSinkD), ), input: ptraceutiltest.NewTraces("AB", "CD", "EF", "FG"), @@ -561,7 +570,7 @@ func TestTracesConnectorDetailed(t *testing.T) { { name: "resource/all_match_only_once", cfg: testConfig( - withRoute("resource", isAnyResource, idSink0), + withRoute("resource", "true", idSink0), withRoute("resource", isResourceA+" or "+isResourceB, idSink1), withDefault(idSinkD), ), @@ -628,6 +637,169 @@ func TestTracesConnectorDetailed(t *testing.T) { expectSink1: ptrace.Traces{}, expectSinkD: ptrace.Traces{}, }, + { + name: "span/all_match_first_only", + cfg: testConfig( + withRoute("span", "true", idSink0), + withRoute("span", isSpanY, idSink1), + withDefault(idSinkD), + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink1: ptrace.Traces{}, + expectSinkD: ptrace.Traces{}, + }, + { + name: "span/all_match_last_only", + cfg: testConfig( + withRoute("span", isSpanX, idSink0), + withRoute("span", "true", idSink1), + withDefault(idSinkD), + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptrace.Traces{}, + expectSink1: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSinkD: ptrace.Traces{}, + }, + { + name: "span/all_match_only_once", + cfg: testConfig( + withRoute("span", "true", idSink0), + withRoute("span", isSpanE+" or "+isSpanF, idSink1), + withDefault(idSinkD), + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink1: ptrace.Traces{}, + expectSinkD: ptrace.Traces{}, + }, + { + name: "span/each_matches_one", + cfg: testConfig( + withRoute("span", isSpanE, idSink0), + withRoute("span", isSpanF, idSink1), + withDefault(idSinkD), + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("AB", "CD", "E", "GH"), + expectSink1: ptraceutiltest.NewTraces("AB", "CD", "F", "GH"), + expectSinkD: ptrace.Traces{}, + }, + { + name: "span/some_match_with_default", + cfg: testConfig( + withRoute("span", isSpanX, idSink0), + withRoute("span", isSpanF, idSink1), + withDefault(idSinkD), + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptrace.Traces{}, + expectSink1: ptraceutiltest.NewTraces("AB", "CD", "F", "GH"), + expectSinkD: ptraceutiltest.NewTraces("AB", "CD", "E", "GH"), + }, + { + name: "span/some_match_without_default", + cfg: testConfig( + withRoute("span", isSpanX, idSink0), + withRoute("span", isSpanF, idSink1), + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptrace.Traces{}, + expectSink1: ptraceutiltest.NewTraces("AB", "CD", "F", "GH"), + expectSinkD: ptrace.Traces{}, + }, + { + name: "span/match_none_with_default", + cfg: testConfig( + withRoute("span", isSpanX, idSink0), + withRoute("span", isSpanY, idSink1), + withDefault(idSinkD), + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptrace.Traces{}, + expectSink1: ptrace.Traces{}, + expectSinkD: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + }, + { + name: "span/match_none_without_default", + cfg: testConfig( + withRoute("span", isSpanX, idSink0), + withRoute("span", isSpanY, idSink1), + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptrace.Traces{}, + expectSink1: ptrace.Traces{}, + expectSinkD: ptrace.Traces{}, + }, + { + name: "span/with_resource_condition", + cfg: testConfig( + withRoute("span", isResourceBFromLowerContext, idSink0), + withRoute("span", isSpanY, idSink1), + withDefault(idSinkD), + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("B", "CD", "EF", "GH"), + expectSink1: ptrace.Traces{}, + expectSinkD: ptraceutiltest.NewTraces("A", "CD", "EF", "GH"), + }, + { + name: "span/with_scope_condition", + cfg: testConfig( + withRoute("span", isScopeCFromLowerContext, idSink0), + withRoute("span", isSpanY, idSink1), + withDefault(idSinkD), + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("AB", "C", "EF", "GH"), + expectSink1: ptrace.Traces{}, + expectSinkD: ptraceutiltest.NewTraces("AB", "D", "EF", "GH"), + }, + { + name: "span/with_resource_and_scope_conditions", + cfg: testConfig( + withRoute("span", isResourceBFromLowerContext+" and "+isScopeDFromLowerContext, idSink0), + withRoute("span", isSpanY, idSink1), + withDefault(idSinkD), + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("B", "D", "EF", "GH"), + expectSink1: ptrace.Traces{}, + expectSinkD: ptraceutiltest.NewTracesFromOpts( + ptraceutiltest.WithResource('A', + ptraceutiltest.WithScope('C', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")), + ptraceutiltest.WithScope('D', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")), + ), + ptraceutiltest.WithResource('B', + ptraceutiltest.WithScope('C', ptraceutiltest.WithSpan('E', "GH"), ptraceutiltest.WithSpan('F', "GH")), + ), + ), + }, + { + name: "mixed/match_resource_then_metrics", + cfg: testConfig( + withRoute("resource", isResourceA, idSink0), + withRoute("span", isSpanE, idSink1), + withDefault(idSinkD), + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("A", "CD", "EF", "GH"), + expectSink1: ptraceutiltest.NewTraces("B", "CD", "E", "GH"), + expectSinkD: ptraceutiltest.NewTraces("B", "CD", "F", "GH"), + }, + { + name: "mixed/match_metrics_then_resource", + cfg: testConfig( + withRoute("span", isSpanE, idSink0), + withRoute("resource", isResourceB, idSink1), + withDefault(idSinkD), + ), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("AB", "CD", "E", "GH"), + expectSink1: ptraceutiltest.NewTraces("B", "CD", "F", "GH"), + expectSinkD: ptraceutiltest.NewTraces("A", "CD", "F", "GH"), + }, + { name: "mixed/match_resource_then_grpc_request", cfg: testConfig( @@ -641,6 +813,19 @@ func TestTracesConnectorDetailed(t *testing.T) { expectSink1: ptraceutiltest.NewTraces("B", "CD", "EF", "GH"), expectSinkD: ptrace.Traces{}, }, + { + name: "mixed/match_metrics_then_grpc_request", + cfg: testConfig( + withRoute("span", isSpanF, idSink0), + withRoute("request", isAcme, idSink1), + withDefault(idSinkD), + ), + ctx: withGRPCMetadata(context.Background(), map[string]string{"X-Tenant": "acme"}), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("AB", "CD", "F", "GH"), + expectSink1: ptraceutiltest.NewTraces("AB", "CD", "E", "GH"), + expectSinkD: ptrace.Traces{}, + }, { name: "mixed/match_resource_then_http_request", cfg: testConfig( @@ -654,6 +839,19 @@ func TestTracesConnectorDetailed(t *testing.T) { expectSink1: ptraceutiltest.NewTraces("B", "CD", "EF", "GH"), expectSinkD: ptrace.Traces{}, }, + { + name: "mixed/match_metrics_then_http_request", + cfg: testConfig( + withRoute("span", isSpanF, idSink0), + withRoute("request", isAcme, idSink1), + withDefault(idSinkD), + ), + ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"acme"}}), + input: ptraceutiltest.NewTraces("AB", "CD", "EF", "GH"), + expectSink0: ptraceutiltest.NewTraces("AB", "CD", "F", "GH"), + expectSink1: ptraceutiltest.NewTraces("AB", "CD", "E", "GH"), + expectSinkD: ptrace.Traces{}, + }, } for _, tt := range testCases {