From dcf42df62949c91272e7fdd680ff486681381991 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Wed, 8 Jan 2025 22:31:16 -0600 Subject: [PATCH 1/3] [connector/routing] Disconnect 'match_once' parameter (#37095) --- .../routing-disconnect-matchonce-api.yaml | 29 ++++++++ .chloggen/routing-disconnect-matchonce.yaml | 28 ++++++++ connector/routingconnector/config.go | 8 +-- connector/routingconnector/config_test.go | 68 ------------------- connector/routingconnector/factory.go | 1 - connector/routingconnector/logs.go | 49 +------------ connector/routingconnector/logs_test.go | 52 -------------- connector/routingconnector/metrics.go | 48 +------------ connector/routingconnector/metrics_test.go | 60 ---------------- connector/routingconnector/traces.go | 47 +------------ connector/routingconnector/traces_test.go | 26 ------- 11 files changed, 65 insertions(+), 351 deletions(-) create mode 100644 .chloggen/routing-disconnect-matchonce-api.yaml create mode 100644 .chloggen/routing-disconnect-matchonce.yaml diff --git a/.chloggen/routing-disconnect-matchonce-api.yaml b/.chloggen/routing-disconnect-matchonce-api.yaml new file mode 100644 index 000000000000..74cc12e4925e --- /dev/null +++ b/.chloggen/routing-disconnect-matchonce-api.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: routingconnector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Change `match_once` parameter from `bool` to `*bool`. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29882] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Boolean values should still unmarshal successfully, but direct instantiation in code will fail. + The change allows us to check for usage and warn of the upcoming removal in v0.120.0. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/routing-disconnect-matchonce.yaml b/.chloggen/routing-disconnect-matchonce.yaml new file mode 100644 index 000000000000..9b72477bcbdd --- /dev/null +++ b/.chloggen/routing-disconnect-matchonce.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: routingconnector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Disconnect `match_once` parameter from functionality. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29882] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The parameter will be ignored, except to trigger a warning log about its upcoming removal in v0.120.0. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/connector/routingconnector/config.go b/connector/routingconnector/config.go index b3d5add7160e..916a9d267e95 100644 --- a/connector/routingconnector/config.go +++ b/connector/routingconnector/config.go @@ -43,8 +43,8 @@ type Config struct { Table []RoutingTableItem `mapstructure:"table"` // MatchOnce determines whether the connector matches multiple statements. - // Optional. - MatchOnce bool `mapstructure:"match_once"` + // Unused. Deprecated in v0.116.0. Will be removed in v0.120.0. + MatchOnce *bool `mapstructure:"match_once"` } // Validate checks if the processor configuration is valid. @@ -77,10 +77,6 @@ func (c *Config) Validate() error { return err } fallthrough - case "span", "metric", "datapoint", "log": // ok - if !c.MatchOnce { - return fmt.Errorf(`%q context is not supported with "match_once: false"`, item.Context) - } default: return errors.New("invalid context: " + item.Context) } diff --git a/connector/routingconnector/config_test.go b/connector/routingconnector/config_test.go index 4e001ed6bd7f..0e4e54d19d22 100644 --- a/connector/routingconnector/config_test.go +++ b/connector/routingconnector/config_test.go @@ -27,7 +27,6 @@ func TestLoadConfig(t *testing.T) { configPath: filepath.Join("testdata", "config", "traces.yaml"), id: component.NewIDWithName(metadata.Type, ""), expected: &Config{ - MatchOnce: true, DefaultPipelines: []pipeline.ID{ pipeline.NewIDWithName(pipeline.SignalTraces, "otlp-all"), }, @@ -53,7 +52,6 @@ func TestLoadConfig(t *testing.T) { configPath: filepath.Join("testdata", "config", "metrics.yaml"), id: component.NewIDWithName(metadata.Type, ""), expected: &Config{ - MatchOnce: true, DefaultPipelines: []pipeline.ID{ pipeline.NewIDWithName(pipeline.SignalMetrics, "otlp-all"), }, @@ -79,7 +77,6 @@ func TestLoadConfig(t *testing.T) { configPath: filepath.Join("testdata", "config", "logs.yaml"), id: component.NewIDWithName(metadata.Type, ""), expected: &Config{ - MatchOnce: true, DefaultPipelines: []pipeline.ID{ pipeline.NewIDWithName(pipeline.SignalLogs, "otlp-all"), }, @@ -221,70 +218,6 @@ func TestValidateConfig(t *testing.T) { }, error: "invalid context: invalid", }, - { - name: "span context with match_once false", - config: &Config{ - MatchOnce: false, - Table: []RoutingTableItem{ - { - Context: "span", - Statement: `route() where attributes["attr"] == "acme"`, - Pipelines: []pipeline.ID{ - pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"), - }, - }, - }, - }, - error: `"span" context is not supported with "match_once: false"`, - }, - { - name: "metric context with match_once false", - config: &Config{ - MatchOnce: false, - Table: []RoutingTableItem{ - { - Context: "metric", - Statement: `route() where attributes["attr"] == "acme"`, - Pipelines: []pipeline.ID{ - pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"), - }, - }, - }, - }, - error: `"metric" context is not supported with "match_once: false"`, - }, - { - name: "datapoint context with match_once false", - config: &Config{ - MatchOnce: false, - Table: []RoutingTableItem{ - { - Context: "datapoint", - Statement: `route() where attributes["attr"] == "acme"`, - Pipelines: []pipeline.ID{ - pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"), - }, - }, - }, - }, - error: `"datapoint" context is not supported with "match_once: false"`, - }, - { - name: "log context with match_once false", - config: &Config{ - MatchOnce: false, - Table: []RoutingTableItem{ - { - Context: "log", - Statement: `route() where attributes["attr"] == "acme"`, - Pipelines: []pipeline.ID{ - pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"), - }, - }, - }, - }, - error: `"log" context is not supported with "match_once: false"`, - }, { name: "request context with statement", config: &Config{ @@ -349,7 +282,6 @@ func withDefault(pipelines ...pipeline.ID) testConfigOption { func testConfig(opts ...testConfigOption) *Config { cfg := createDefaultConfig().(*Config) - cfg.MatchOnce = true for _, opt := range opts { opt(cfg) } diff --git a/connector/routingconnector/factory.go b/connector/routingconnector/factory.go index aaa569cb72b9..ad4bc4a23fb4 100644 --- a/connector/routingconnector/factory.go +++ b/connector/routingconnector/factory.go @@ -31,7 +31,6 @@ func NewFactory() connector.Factory { func createDefaultConfig() component.Config { return &Config{ ErrorMode: ottl.PropagateError, - MatchOnce: true, } } diff --git a/connector/routingconnector/logs.go b/connector/routingconnector/logs.go index 1c8d1923247a..040bcd145312 100644 --- a/connector/routingconnector/logs.go +++ b/connector/routingconnector/logs.go @@ -35,8 +35,8 @@ func newLogsConnector( ) (*logsConnector, error) { cfg := config.(*Config) - if !cfg.MatchOnce { - set.Logger.Error("The 'match_once' field has been deprecated and will be removed in v0.120.0. Remove usage of the parameter to suppress this warning.") + if cfg.MatchOnce != nil { + set.Logger.Error("The 'match_once' field has been deprecated and no longer has any effect. It will be removed in v0.120.0.") } lr, ok := logs.(connector.LogsRouterAndConsumer) @@ -65,15 +65,6 @@ func (c *logsConnector) Capabilities() consumer.Capabilities { } func (c *logsConnector) ConsumeLogs(ctx context.Context, ld plog.Logs) error { - if c.config.MatchOnce { - return c.switchLogs(ctx, ld) - } - return c.matchAllLogs(ctx, ld) -} - -// switchLogs removes items from the original plog.Logs as they are matched, -// and sends them to the appropriate consumer. -func (c *logsConnector) switchLogs(ctx context.Context, ld plog.Logs) error { groups := make(map[consumer.Logs]plog.Logs) var errs error for i := 0; i < len(c.router.routeSlice) && ld.ResourceLogs().Len() > 0; i++ { @@ -120,42 +111,6 @@ func (c *logsConnector) switchLogs(ctx context.Context, ld plog.Logs) error { return errs } -func (c *logsConnector) matchAllLogs(ctx context.Context, ld plog.Logs) error { - // routingEntry is used to group plog.ResourceLogs that are routed to - // the same set of exporters. - // This way we're not ending up with all the logs split up which would cause - // higher CPU usage. - groups := make(map[consumer.Logs]plog.Logs) - var errs error - for i := 0; i < ld.ResourceLogs().Len(); i++ { - rlogs := ld.ResourceLogs().At(i) - rtx := ottlresource.NewTransformContext(rlogs.Resource(), rlogs) - noRoutesMatch := true - for _, route := range c.router.routeSlice { - _, isMatch, err := route.resourceStatement.Execute(ctx, rtx) - if err != nil { - if c.config.ErrorMode == ottl.PropagateError { - return err - } - groupLogs(groups, c.router.defaultConsumer, rlogs) - continue - } - if isMatch { - noRoutesMatch = false - groupLogs(groups, route.consumer, rlogs) - } - } - if noRoutesMatch { - // no route conditions are matched, add resource logs to default exporters group - groupLogs(groups, c.router.defaultConsumer, rlogs) - } - } - for consumer, group := range groups { - errs = errors.Join(errs, consumer.ConsumeLogs(ctx, group)) - } - return errs -} - func groupAllLogs( groups map[consumer.Logs]plog.Logs, cons consumer.Logs, diff --git a/connector/routingconnector/logs_test.go b/connector/routingconnector/logs_test.go index 3cf9ca03321a..19e6e6947a81 100644 --- a/connector/routingconnector/logs_test.go +++ b/connector/routingconnector/logs_test.go @@ -160,57 +160,6 @@ func TestLogsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) { assert.Empty(t, sink1.AllLogs()) }) - t.Run("logs matched by two expressions", func(t *testing.T) { - resetSinks() - - l := plog.NewLogs() - - rl := l.ResourceLogs().AppendEmpty() - rl.Resource().Attributes().PutStr("X-Tenant", "x_acme") - rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - - rl = l.ResourceLogs().AppendEmpty() - rl.Resource().Attributes().PutStr("X-Tenant", "_acme") - rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - - require.NoError(t, conn.ConsumeLogs(context.Background(), l)) - - assert.Empty(t, defaultSink.AllLogs()) - assert.Len(t, sink0.AllLogs(), 1) - assert.Len(t, sink1.AllLogs(), 1) - - assert.Equal(t, 2, sink0.AllLogs()[0].LogRecordCount()) - assert.Equal(t, 2, sink1.AllLogs()[0].LogRecordCount()) - assert.Equal(t, sink0.AllLogs(), sink1.AllLogs()) - }) - - t.Run("one log matched by multiple expressions, other matched none", func(t *testing.T) { - resetSinks() - - l := plog.NewLogs() - - rl := l.ResourceLogs().AppendEmpty() - rl.Resource().Attributes().PutStr("X-Tenant", "_acme") - rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - - rl = l.ResourceLogs().AppendEmpty() - rl.Resource().Attributes().PutStr("X-Tenant", "something-else") - rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - - require.NoError(t, conn.ConsumeLogs(context.Background(), l)) - - assert.Len(t, defaultSink.AllLogs(), 1) - assert.Len(t, sink0.AllLogs(), 1) - assert.Len(t, sink1.AllLogs(), 1) - - assert.Equal(t, sink0.AllLogs(), sink1.AllLogs()) - - rlog := defaultSink.AllLogs()[0].ResourceLogs().At(0) - attr, ok := rlog.Resource().Attributes().Get("X-Tenant") - assert.True(t, ok, "routing attribute must exists") - assert.Equal(t, "something-else", attr.AsString()) - }) - t.Run("logs matched by one expression, multiple pipelines", func(t *testing.T) { resetSinks() @@ -253,7 +202,6 @@ func TestLogsAreCorrectlyMatchOnceWithOTTL(t *testing.T) { Pipelines: []pipeline.ID{logsDefault, logs0}, }, }, - MatchOnce: true, } var defaultSink, sink0, sink1 consumertest.LogsSink diff --git a/connector/routingconnector/metrics.go b/connector/routingconnector/metrics.go index 51f08f5ff2d4..9141e264703b 100644 --- a/connector/routingconnector/metrics.go +++ b/connector/routingconnector/metrics.go @@ -36,8 +36,8 @@ func newMetricsConnector( ) (*metricsConnector, error) { cfg := config.(*Config) - if !cfg.MatchOnce { - set.Logger.Error("The 'match_once' field has been deprecated and will be removed in v0.120.0. Remove usage of the parameter to suppress this warning.") + if cfg.MatchOnce != nil { + set.Logger.Error("The 'match_once' field has been deprecated and no longer has any effect. It will be removed in v0.120.0.") } mr, ok := metrics.(connector.MetricsRouterAndConsumer) @@ -66,13 +66,6 @@ func (c *metricsConnector) Capabilities() consumer.Capabilities { } func (c *metricsConnector) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - if c.config.MatchOnce { - return c.switchMetrics(ctx, md) - } - return c.matchAllMetrics(ctx, md) -} - -func (c *metricsConnector) switchMetrics(ctx context.Context, md pmetric.Metrics) error { groups := make(map[consumer.Metrics]pmetric.Metrics) var errs error for i := 0; i < len(c.router.routeSlice) && md.ResourceMetrics().Len() > 0; i++ { @@ -128,43 +121,6 @@ func (c *metricsConnector) switchMetrics(ctx context.Context, md pmetric.Metrics return errs } -func (c *metricsConnector) matchAllMetrics(ctx context.Context, md pmetric.Metrics) error { - // groups is used to group pmetric.ResourceMetrics that are routed to - // the same set of exporters. This way we're not ending up with all the - // metrics split up which would cause higher CPU usage. - groups := make(map[consumer.Metrics]pmetric.Metrics) - - var errs error - for i := 0; i < md.ResourceMetrics().Len(); i++ { - rmetrics := md.ResourceMetrics().At(i) - rtx := ottlresource.NewTransformContext(rmetrics.Resource(), rmetrics) - - noRoutesMatch := true - for _, route := range c.router.routeSlice { - _, isMatch, err := route.resourceStatement.Execute(ctx, rtx) - if err != nil { - if c.config.ErrorMode == ottl.PropagateError { - return err - } - groupMetrics(groups, c.router.defaultConsumer, rmetrics) - continue - } - if isMatch { - noRoutesMatch = false - groupMetrics(groups, route.consumer, rmetrics) - } - } - if noRoutesMatch { - // no route conditions are matched, add resource metrics to default exporters group - groupMetrics(groups, c.router.defaultConsumer, rmetrics) - } - } - for consumer, group := range groups { - errs = errors.Join(errs, consumer.ConsumeMetrics(ctx, group)) - } - return errs -} - func groupAllMetrics( groups map[consumer.Metrics]pmetric.Metrics, cons consumer.Metrics, diff --git a/connector/routingconnector/metrics_test.go b/connector/routingconnector/metrics_test.go index 2f8335980940..a0fb90467d7f 100644 --- a/connector/routingconnector/metrics_test.go +++ b/connector/routingconnector/metrics_test.go @@ -165,65 +165,6 @@ func TestMetricsAreCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) { assert.Empty(t, sink1.AllMetrics()) }) - t.Run("metric matched by two expressions", func(t *testing.T) { - resetSinks() - - m := pmetric.NewMetrics() - - rm := m.ResourceMetrics().AppendEmpty() - rm.Resource().Attributes().PutDouble("value", 5.0) - metric := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - metric.SetEmptyGauge() - metric.SetName("cpu") - - rm = m.ResourceMetrics().AppendEmpty() - rm.Resource().Attributes().PutDouble("value", 3.1) - metric = rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - metric.SetEmptyGauge() - metric.SetName("cpu1") - - require.NoError(t, conn.ConsumeMetrics(context.Background(), m)) - - assert.Empty(t, defaultSink.AllMetrics()) - assert.Len(t, sink0.AllMetrics(), 1) - assert.Len(t, sink1.AllMetrics(), 1) - - assert.Equal(t, 2, sink0.AllMetrics()[0].MetricCount()) - assert.Equal(t, 2, sink1.AllMetrics()[0].MetricCount()) - assert.Equal(t, sink0.AllMetrics(), sink1.AllMetrics()) - }) - - t.Run("one metric matched by 2 expressions, others matched by none", func(t *testing.T) { - resetSinks() - - m := pmetric.NewMetrics() - - rm := m.ResourceMetrics().AppendEmpty() - rm.Resource().Attributes().PutDouble("value", 5.0) - metric := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - metric.SetEmptyGauge() - metric.SetName("cpu") - - rm = m.ResourceMetrics().AppendEmpty() - rm.Resource().Attributes().PutDouble("value", -1.0) - metric = rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() - metric.SetEmptyGauge() - metric.SetName("cpu1") - - require.NoError(t, conn.ConsumeMetrics(context.Background(), m)) - - assert.Len(t, defaultSink.AllMetrics(), 1) - assert.Len(t, sink0.AllMetrics(), 1) - assert.Len(t, sink1.AllMetrics(), 1) - - assert.Equal(t, sink0.AllMetrics(), sink1.AllMetrics()) - - rmetric := defaultSink.AllMetrics()[0].ResourceMetrics().At(0) - attr, ok := rmetric.Resource().Attributes().Get("value") - assert.True(t, ok, "routing attribute must exist") - assert.Equal(t, attr.Double(), float64(-1.0)) - }) - t.Run("metric matched by one expression, multiple pipelines", func(t *testing.T) { resetSinks() @@ -268,7 +209,6 @@ func TestMetricsAreCorrectlyMatchOnceWithOTTL(t *testing.T) { Pipelines: []pipeline.ID{metricsDefault, metrics0}, }, }, - MatchOnce: true, } var defaultSink, sink0, sink1 consumertest.MetricsSink diff --git a/connector/routingconnector/traces.go b/connector/routingconnector/traces.go index a8824a16a491..fb9350e5abeb 100644 --- a/connector/routingconnector/traces.go +++ b/connector/routingconnector/traces.go @@ -35,8 +35,8 @@ func newTracesConnector( ) (*tracesConnector, error) { cfg := config.(*Config) - if !cfg.MatchOnce { - set.Logger.Error("The 'match_once' field has been deprecated and will be removed in v0.120.0. Remove usage of the parameter to suppress this warning.") + if cfg.MatchOnce != nil { + set.Logger.Error("The 'match_once' field has been deprecated and no longer has any effect. It will be removed in v0.120.0.") } tr, ok := traces.(connector.TracesRouterAndConsumer) @@ -65,13 +65,6 @@ func (*tracesConnector) Capabilities() consumer.Capabilities { } func (c *tracesConnector) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - if c.config.MatchOnce { - return c.switchTraces(ctx, td) - } - return c.matchAllTraces(ctx, td) -} - -func (c *tracesConnector) switchTraces(ctx context.Context, td ptrace.Traces) error { groups := make(map[consumer.Traces]ptrace.Traces) var errs error for i := 0; i < len(c.router.routeSlice) && td.ResourceSpans().Len() > 0; i++ { @@ -118,42 +111,6 @@ func (c *tracesConnector) switchTraces(ctx context.Context, td ptrace.Traces) er return errs } -func (c *tracesConnector) matchAllTraces(ctx context.Context, td ptrace.Traces) error { - // groups is used to group ptrace.ResourceSpans that are routed to - // the same set of pipelines. This way we're not ending up with all the - // spans split up which would cause higher CPU usage. - groups := make(map[consumer.Traces]ptrace.Traces) - - var errs error - for i := 0; i < td.ResourceSpans().Len(); i++ { - rspans := td.ResourceSpans().At(i) - rtx := ottlresource.NewTransformContext(rspans.Resource(), rspans) - noRoutesMatch := true - for _, route := range c.router.routeSlice { - _, isMatch, err := route.resourceStatement.Execute(ctx, rtx) - if err != nil { - if c.config.ErrorMode == ottl.PropagateError { - return err - } - groupTraces(groups, c.router.defaultConsumer, rspans) - continue - } - if isMatch { - noRoutesMatch = false - groupTraces(groups, route.consumer, rspans) - } - } - if noRoutesMatch { - // no route conditions are matched, add resource spans to default pipelines group - groupTraces(groups, c.router.defaultConsumer, rspans) - } - } - for consumer, group := range groups { - errs = errors.Join(errs, consumer.ConsumeTraces(ctx, group)) - } - return errs -} - func groupAllTraces( groups map[consumer.Traces]ptrace.Traces, cons consumer.Traces, diff --git a/connector/routingconnector/traces_test.go b/connector/routingconnector/traces_test.go index fb597e2a6968..dd70f958f167 100644 --- a/connector/routingconnector/traces_test.go +++ b/connector/routingconnector/traces_test.go @@ -161,31 +161,6 @@ func TestTracesCorrectlySplitPerResourceAttributeWithOTTL(t *testing.T) { assert.Empty(t, sink1.AllTraces()) }) - t.Run("span matched by all expressions", func(t *testing.T) { - resetSinks() - - tr := ptrace.NewTraces() - rl := tr.ResourceSpans().AppendEmpty() - rl.Resource().Attributes().PutInt("value", 2) - span := rl.ScopeSpans().AppendEmpty().Spans().AppendEmpty() - span.SetName("span") - - rl = tr.ResourceSpans().AppendEmpty() - rl.Resource().Attributes().PutInt("value", 3) - span = rl.ScopeSpans().AppendEmpty().Spans().AppendEmpty() - span.SetName("span1") - - require.NoError(t, conn.ConsumeTraces(context.Background(), tr)) - - assert.Empty(t, defaultSink.AllTraces()) - assert.Len(t, sink0.AllTraces(), 1) - assert.Len(t, sink1.AllTraces(), 1) - - assert.Equal(t, 2, sink0.AllTraces()[0].SpanCount()) - assert.Equal(t, 2, sink1.AllTraces()[0].SpanCount()) - assert.Equal(t, sink0.AllTraces(), sink1.AllTraces()) - }) - t.Run("span matched by one expression, multiple pipelines", func(t *testing.T) { resetSinks() @@ -214,7 +189,6 @@ func TestTracesCorrectlyMatchOnceWithOTTL(t *testing.T) { cfg := &Config{ DefaultPipelines: []pipeline.ID{tracesDefault}, - MatchOnce: true, Table: []RoutingTableItem{ { Statement: `route() where attributes["value"] > 0 and attributes["value"] < 4`, From c645253938d3c902d3b2db93c0beb133f8fdff43 Mon Sep 17 00:00:00 2001 From: Zixin Zhou Date: Thu, 9 Jan 2025 12:32:13 +0800 Subject: [PATCH 2/3] [chore] add more Windows versions for testing (#37025) --- .github/workflows/build-and-test-windows.yml | 3 ++- .github/workflows/e2e-tests-windows.yml | 18 +++++++++++++++--- .../exporter_concurrency_test.go | 15 +++++++++++++++ 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build-and-test-windows.yml b/.github/workflows/build-and-test-windows.yml index dafbfe3838a8..b4d56ec35f48 100644 --- a/.github/workflows/build-and-test-windows.yml +++ b/.github/workflows/build-and-test-windows.yml @@ -45,7 +45,8 @@ jobs: - cmd-0 - cmd-1 - other - runs-on: windows-latest + os: [windows-2022, windows-2025] + runs-on: ${{ matrix.os }} if: ${{ github.actor != 'dependabot[bot]' && (contains(github.event.pull_request.labels.*.name, 'Run Windows') || github.event_name == 'push' || github.event_name == 'merge_group') }} env: # Limit memory usage via GC environment variables to avoid OOM on GH runners, especially for `cmd/otelcontribcol`, diff --git a/.github/workflows/e2e-tests-windows.yml b/.github/workflows/e2e-tests-windows.yml index bcfb4f38d865..69b3f8ac8027 100644 --- a/.github/workflows/e2e-tests-windows.yml +++ b/.github/workflows/e2e-tests-windows.yml @@ -32,7 +32,11 @@ jobs: - run: echo $(./.github/workflows/scripts/is_changed_file_windows.sh ${{ github.event.pull_request.base.sha }} ${{ github.sha }} ) collector-build: - runs-on: windows-latest + strategy: + fail-fast: false + matrix: + os: [windows-latest] + runs-on: ${{ matrix.os }} needs: [windows-file-changed] if: ${{ github.actor != 'dependabot[bot]' && ((contains(github.event.pull_request.labels.*.name, 'Run Windows') || github.event_name == 'push' || github.event_name == 'merge_group') || needs.windows-file-changed.outputs.changed == 'true') }} steps: @@ -65,7 +69,11 @@ jobs: path: ./bin/* supervisor-test: - runs-on: windows-latest + strategy: + fail-fast: false + matrix: + os: [windows-2022, windows-2025] + runs-on: ${{ matrix.os }} if: ${{ github.actor != 'dependabot[bot]' && (contains(github.event.pull_request.labels.*.name, 'Run Windows') || github.event_name == 'push' || github.event_name == 'merge_group') }} needs: [collector-build] steps: @@ -97,7 +105,11 @@ jobs: go test -v --tags=e2e windows-supervisor-service-test: - runs-on: windows-latest + strategy: + fail-fast: false + matrix: + os: [windows-2022, windows-2025] + runs-on: ${{ matrix.os }} if: ${{ github.actor != 'dependabot[bot]' && (contains(github.event.pull_request.labels.*.name, 'Run Windows') || github.event_name == 'push' || github.event_name == 'merge_group') }} needs: [collector-build] steps: diff --git a/exporter/prometheusremotewriteexporter/exporter_concurrency_test.go b/exporter/prometheusremotewriteexporter/exporter_concurrency_test.go index 689cbcf7b9fc..bf9fcbd968cc 100644 --- a/exporter/prometheusremotewriteexporter/exporter_concurrency_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_concurrency_test.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "net/http/httptest" + "os" "strconv" "sync" "testing" @@ -31,6 +32,9 @@ import ( // Test everything works when there is more than one goroutine calling PushMetrics. // Today we only use 1 worker per exporter, but the intention of this test is to future-proof in case it changes. func Test_PushMetricsConcurrent(t *testing.T) { + if os.Getenv("ImageOs") == "win25" && os.Getenv("GITHUB_ACTIONS") == "true" { + t.Skip("Skipping test on Windows 2025 GH runners, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37104") + } n := 1000 ms := make([]pmetric.Metrics, n) testIDKey := "test_id" @@ -52,6 +56,10 @@ func Test_PushMetricsConcurrent(t *testing.T) { t.Fatal(err) } assert.NotNil(t, body) + if len(body) == 0 { + // No content, nothing to do. The request is just checking that the server is up. + return + } // Receives the http requests and unzip, unmarshalls, and extracts TimeSeries assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version")) assert.Equal(t, "snappy", r.Header.Get("Content-Encoding")) @@ -124,6 +132,13 @@ func Test_PushMetricsConcurrent(t *testing.T) { require.NoError(t, prwe.Shutdown(ctx)) }() + // Ensure that the test server is up before making the requests + assert.EventuallyWithT(t, func(c *assert.CollectT) { + resp, checkRequestErr := http.Get(server.URL) + require.NoError(c, checkRequestErr) + assert.NoError(c, resp.Body.Close()) + }, 5*time.Second, 100*time.Millisecond) + var wg sync.WaitGroup wg.Add(n) for _, m := range ms { From 5beb8187ff78f934cb9e326f87e751e5ae199085 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Thu, 9 Jan 2025 12:23:11 +0100 Subject: [PATCH 3/3] [elasticsearchexporter] Handle EventName (#37012) closes #37011 --------- Co-authored-by: Carson Ip --- .../elasticsearchexporter_event-name.yaml | 27 +++++++++++++++++++ .../elasticsearchexporter/exporter_test.go | 6 ++--- exporter/elasticsearchexporter/model.go | 16 ++++++++--- exporter/elasticsearchexporter/model_test.go | 27 +++++++++++++++++++ 4 files changed, 70 insertions(+), 6 deletions(-) create mode 100644 .chloggen/elasticsearchexporter_event-name.yaml diff --git a/.chloggen/elasticsearchexporter_event-name.yaml b/.chloggen/elasticsearchexporter_event-name.yaml new file mode 100644 index 000000000000..8887665616e8 --- /dev/null +++ b/.chloggen/elasticsearchexporter_event-name.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Handle `EventName` for log records in OTel mode + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37011] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 5554c089e02b..33a0cf6d1383 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -450,7 +450,7 @@ func TestExporterLogs(t *testing.T) { return vm }(), isEvent: true, - wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`), + wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"event_name":"foo","data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"true":true,"false":false,"inner":{"foo":"bar"}}}}`), }, { body: func() pcommon.Value { @@ -473,7 +473,7 @@ func TestExporterLogs(t *testing.T) { return vs }(), isEvent: true, - wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"value":["foo",false,{"foo":"bar"}]}}}`), + wantDocument: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"attr.foo":"attr.foo.value","event.name":"foo"},"event_name":"foo","data_stream":{"dataset":"attr.dataset.otel","namespace":"resource.attribute.namespace","type":"logs"},"dropped_attributes_count":0,"observed_timestamp":"1970-01-01T00:00:00.000000000Z","resource":{"attributes":{"resource.attr.foo":"resource.attr.foo.value"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0},"severity_number":0,"body":{"structured":{"value":["foo",false,{"foo":"bar"}]}}}`), }, } { rec := newBulkRecorder() @@ -1629,7 +1629,7 @@ func TestExporterTraces(t *testing.T) { }, { Action: []byte(`{"create":{"_index":"logs-generic.otel-default"}}`), - Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"event.attr.foo":"event.attr.bar","event.name":"exception"},"data_stream":{"dataset":"generic.otel","namespace":"default","type":"logs"},"dropped_attributes_count":1,"resource":{"attributes":{"resource.foo":"resource.bar"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), + Document: []byte(`{"@timestamp":"1970-01-01T00:00:00.000000000Z","attributes":{"event.attr.foo":"event.attr.bar","event.name":"exception"},"event_name":"exception","data_stream":{"dataset":"generic.otel","namespace":"default","type":"logs"},"dropped_attributes_count":1,"resource":{"attributes":{"resource.foo":"resource.bar"},"dropped_attributes_count":0},"scope":{"dropped_attributes_count":0}}`), }, } diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index a5d9fff13892..f1c0e615f8eb 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -178,21 +178,29 @@ func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchem document.AddInt("severity_number", int64(record.SeverityNumber())) document.AddInt("dropped_attributes_count", int64(record.DroppedAttributesCount())) + if record.EventName() != "" { + document.AddString("event_name", record.EventName()) + } else if eventNameAttr, ok := record.Attributes().Get("event.name"); ok && eventNameAttr.Str() != "" { + document.AddString("event_name", eventNameAttr.Str()) + } + m.encodeAttributesOTelMode(&document, record.Attributes()) m.encodeResourceOTelMode(&document, resource, resourceSchemaURL) m.encodeScopeOTelMode(&document, scope, scopeSchemaURL) // Body - setOTelLogBody(&document, record.Body(), record.Attributes()) + setOTelLogBody(&document, record) return document } -func setOTelLogBody(doc *objmodel.Document, body pcommon.Value, attributes pcommon.Map) { +func setOTelLogBody(doc *objmodel.Document, record plog.LogRecord) { // Determine if this log record is an event, as they are mapped differently // https://github.com/open-telemetry/semantic-conventions/blob/main/docs/general/events.md - _, isEvent := attributes.Get("event.name") + _, isEvent := record.Attributes().Get("event.name") + isEvent = isEvent || record.EventName() != "" + body := record.Body() switch body.Type() { case pcommon.ValueTypeMap: if isEvent { @@ -734,6 +742,8 @@ func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaU } var document objmodel.Document document.AddTimestamp("@timestamp", spanEvent.Timestamp()) + document.AddString("event_name", spanEvent.Name()) + // todo remove before GA, make sure Kibana uses event_name document.AddString("attributes.event.name", spanEvent.Name()) document.AddSpanID("span_id", span.SpanID()) document.AddTraceID("trace_id", span.TraceID()) diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index eda750a540e7..dddf46a14a01 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -908,6 +908,7 @@ type OTelRecord struct { ObservedTimestamp time.Time `json:"observed_timestamp"` SeverityNumber int32 `json:"severity_number"` SeverityText string `json:"severity_text"` + EventName string `json:"event_name"` Attributes map[string]any `json:"attributes"` DroppedAttributesCount uint32 `json:"dropped_attributes_count"` Scope OTelScope `json:"scope"` @@ -1076,6 +1077,30 @@ func TestEncodeLogOtelMode(t *testing.T) { return assignDatastreamData(or, "", ds, ns) }, }, + { + name: "event_name from attributes.event.name", + rec: buildOTelRecordTestData(t, func(or OTelRecord) OTelRecord { + or.Attributes["event.name"] = "foo" + or.EventName = "" + return or + }), + wantFn: func(or OTelRecord) OTelRecord { + or.EventName = "foo" + return assignDatastreamData(or) + }, + }, + { + name: "event_name takes precedent over attributes.event.name", + rec: buildOTelRecordTestData(t, func(or OTelRecord) OTelRecord { + or.Attributes["event.name"] = "foo" + or.EventName = "bar" + return or + }), + wantFn: func(or OTelRecord) OTelRecord { + or.EventName = "bar" + return assignDatastreamData(or) + }, + }, } m := encodeModel{ @@ -1117,6 +1142,7 @@ func createTestOTelLogRecord(t *testing.T, rec OTelRecord) (plog.LogRecord, pcom record.SetSeverityNumber(plog.SeverityNumber(rec.SeverityNumber)) record.SetSeverityText(rec.SeverityText) record.SetDroppedAttributesCount(rec.DroppedAttributesCount) + record.SetEventName(rec.EventName) err := record.Attributes().FromRaw(rec.Attributes) require.NoError(t, err) @@ -1143,6 +1169,7 @@ func buildOTelRecordTestData(t *testing.T, fn func(OTelRecord) OTelRecord) OTelR "event.name": "user-password-change", "foo.some": "bar" }, + "event_name": "user-password-change", "dropped_attributes_count": 1, "observed_timestamp": "2024-03-12T20:00:41.123456789Z", "resource": {