diff --git a/processor/filterprocessor/go.mod b/processor/filterprocessor/go.mod index 7d9f5df275cc..008f0c5edef7 100644 --- a/processor/filterprocessor/go.mod +++ b/processor/filterprocessor/go.mod @@ -14,6 +14,7 @@ require ( go.opentelemetry.io/collector/consumer v0.113.1-0.20241115165626-8b99b8023ca3 go.opentelemetry.io/collector/consumer/consumertest v0.113.1-0.20241115165626-8b99b8023ca3 go.opentelemetry.io/collector/pdata v1.19.1-0.20241115165626-8b99b8023ca3 + go.opentelemetry.io/collector/pipeline v0.113.1-0.20241115165626-8b99b8023ca3 go.opentelemetry.io/collector/processor v0.113.1-0.20241115165626-8b99b8023ca3 go.opentelemetry.io/collector/processor/processortest v0.113.1-0.20241115165626-8b99b8023ca3 go.opentelemetry.io/otel v1.32.0 @@ -64,7 +65,6 @@ require ( go.opentelemetry.io/collector/featuregate v1.19.1-0.20241115165626-8b99b8023ca3 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.113.1-0.20241115165626-8b99b8023ca3 // indirect go.opentelemetry.io/collector/pdata/testdata v0.113.1-0.20241115165626-8b99b8023ca3 // indirect - go.opentelemetry.io/collector/pipeline v0.113.1-0.20241115165626-8b99b8023ca3 // indirect go.opentelemetry.io/collector/processor/processorprofiles v0.113.1-0.20241115165626-8b99b8023ca3 // indirect go.opentelemetry.io/collector/semconv v0.113.1-0.20241115165626-8b99b8023ca3 // indirect go.opentelemetry.io/otel/sdk v1.32.0 // indirect diff --git a/processor/filterprocessor/logs.go b/processor/filterprocessor/logs.go index d3e3ed2bb103..7cd8be05ec1c 100644 --- a/processor/filterprocessor/logs.go +++ b/processor/filterprocessor/logs.go @@ -8,6 +8,7 @@ import ( "fmt" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" "go.uber.org/multierr" @@ -22,7 +23,7 @@ import ( type filterLogProcessor struct { skipExpr expr.BoolExpr[ottllog.TransformContext] - telemetry *filterProcessorTelemetry + telemetry *filterTelemetry logger *zap.Logger } @@ -31,7 +32,7 @@ func newFilterLogsProcessor(set processor.Settings, cfg *Config) (*filterLogProc logger: set.Logger, } - fpt, err := newfilterProcessorTelemetry(set) + fpt, err := newFilterTelemetry(set, pipeline.SignalLogs) if err != nil { return nil, fmt.Errorf("error creating filter processor telemetry: %w", err) } @@ -92,7 +93,7 @@ func (flp *filterLogProcessor) processLogs(ctx context.Context, ld plog.Logs) (p }) logCountAfterFilters := ld.LogRecordCount() - flp.telemetry.record(triggerLogsDropped, int64(logCountBeforeFilters-logCountAfterFilters)) + flp.telemetry.record(ctx, int64(logCountBeforeFilters-logCountAfterFilters)) if errors != nil { flp.logger.Error("failed processing logs", zap.Error(errors)) diff --git a/processor/filterprocessor/logs_test.go b/processor/filterprocessor/logs_test.go index 0c4783e21bfd..208abf6e4820 100644 --- a/processor/filterprocessor/logs_test.go +++ b/processor/filterprocessor/logs_test.go @@ -794,6 +794,7 @@ func TestFilterLogProcessorTelemetry(t *testing.T) { } tel.assertMetrics(t, want) + require.NoError(t, tel.Shutdown(context.Background())) } func constructLogs() plog.Logs { diff --git a/processor/filterprocessor/metrics.go b/processor/filterprocessor/metrics.go index ac81eaecfaef..63beb811e2d6 100644 --- a/processor/filterprocessor/metrics.go +++ b/processor/filterprocessor/metrics.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" "go.uber.org/multierr" @@ -29,7 +30,7 @@ type filterMetricProcessor struct { skipResourceExpr expr.BoolExpr[ottlresource.TransformContext] skipMetricExpr expr.BoolExpr[ottlmetric.TransformContext] skipDataPointExpr expr.BoolExpr[ottldatapoint.TransformContext] - telemetry *filterProcessorTelemetry + telemetry *filterTelemetry logger *zap.Logger } @@ -39,7 +40,7 @@ func newFilterMetricProcessor(set processor.Settings, cfg *Config) (*filterMetri logger: set.Logger, } - fpt, err := newfilterProcessorTelemetry(set) + fpt, err := newFilterTelemetry(set, pipeline.SignalMetrics) if err != nil { return nil, fmt.Errorf("error creating filter processor telemetry: %w", err) } @@ -173,7 +174,7 @@ func (fmp *filterMetricProcessor) processMetrics(ctx context.Context, md pmetric }) metricDataPointCountAfterFilters := md.DataPointCount() - fmp.telemetry.record(triggerMetricDataPointsDropped, int64(metricDataPointCountBeforeFilters-metricDataPointCountAfterFilters)) + fmp.telemetry.record(ctx, int64(metricDataPointCountBeforeFilters-metricDataPointCountAfterFilters)) if errors != nil { fmp.logger.Error("failed processing metrics", zap.Error(errors)) diff --git a/processor/filterprocessor/metrics_test.go b/processor/filterprocessor/metrics_test.go index 6ba0e029f314..46c2a5785c1d 100644 --- a/processor/filterprocessor/metrics_test.go +++ b/processor/filterprocessor/metrics_test.go @@ -369,7 +369,6 @@ func TestFilterMetricProcessor(t *testing.T) { func TestFilterMetricProcessorTelemetry(t *testing.T) { tel := setupTestTelemetry() - next := new(consumertest.MetricsSink) cfg := &Config{ Metrics: MetricFilters{ MetricConditions: []string{ @@ -377,82 +376,14 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) { }, }, } - factory := NewFactory() - fmp, err := factory.CreateMetrics( - context.Background(), + fmp, err := newFilterMetricProcessor( tel.NewSettings(), cfg, - next, ) assert.NotNil(t, fmp) assert.NoError(t, err) - caps := fmp.Capabilities() - assert.True(t, caps.MutatesData) - ctx := context.Background() - assert.NoError(t, fmp.Start(ctx, nil)) - - err = fmp.ConsumeMetrics(context.Background(), testResourceMetrics([]metricWithResource{ - { - metricNames: []string{"foo", "bar"}, - resourceAttributes: map[string]any{ - "attr1": "attr1/val1", - }, - }, - })) - assert.NoError(t, err) - - want := []metricdata.Metrics{ - { - Name: "otelcol_processor_filter_datapoints.filtered", - Description: "Number of metric data points dropped by the filter processor", - Unit: "1", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 0, - Attributes: attribute.NewSet(attribute.String("filter", "filter")), - }, - }, - }, - }, - { - Name: "otelcol_processor_incoming_items", - Description: "Number of items passed to the processor. [alpha]", - Unit: "{items}", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 2, - Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")), - }, - }, - }, - }, - { - Name: "otelcol_processor_outgoing_items", - Description: "Number of items emitted from the processor. [alpha]", - Unit: "{items}", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 2, - Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")), - }, - }, - }, - }, - } - - tel.assertMetrics(t, want) - - err = fmp.ConsumeMetrics(context.Background(), testResourceMetrics([]metricWithResource{ + _, err = fmp.processMetrics(context.Background(), testResourceMetrics([]metricWithResource{ { metricNames: []string{"metric1", "metric2"}, resourceAttributes: map[string]any{ @@ -462,7 +393,7 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) { })) assert.NoError(t, err) - want = []metricdata.Metrics{ + want := []metricdata.Metrics{ { Name: "otelcol_processor_filter_datapoints.filtered", Description: "Number of metric data points dropped by the filter processor", @@ -478,99 +409,9 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) { }, }, }, - { - Name: "otelcol_processor_incoming_items", - Description: "Number of items passed to the processor. [alpha]", - Unit: "{items}", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 4, - Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")), - }, - }, - }, - }, - { - Name: "otelcol_processor_outgoing_items", - Description: "Number of items emitted from the processor. [alpha]", - Unit: "{items}", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 3, - Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")), - }, - }, - }, - }, - } - tel.assertMetrics(t, want) - - err = fmp.ConsumeMetrics(context.Background(), testResourceMetrics([]metricWithResource{ - { - metricNames: []string{"metric1"}, - resourceAttributes: map[string]any{ - "attr1": "attr1/val1", - }, - }, - })) - assert.NoError(t, err) - - want = []metricdata.Metrics{ - { - Name: "otelcol_processor_filter_datapoints.filtered", - Description: "Number of metric data points dropped by the filter processor", - Unit: "1", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 2, - Attributes: attribute.NewSet(attribute.String("filter", "filter")), - }, - }, - }, - }, - { - Name: "otelcol_processor_incoming_items", - Description: "Number of items passed to the processor. [alpha]", - Unit: "{items}", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 5, - Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")), - }, - }, - }, - }, - { - Name: "otelcol_processor_outgoing_items", - Description: "Number of items emitted from the processor. [alpha]", - Unit: "{items}", - Data: metricdata.Sum[int64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: true, - DataPoints: []metricdata.DataPoint[int64]{ - { - Value: 3, - Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")), - }, - }, - }, - }, } tel.assertMetrics(t, want) - - assert.NoError(t, fmp.Shutdown(ctx)) + require.NoError(t, tel.Shutdown(context.Background())) } func testResourceMetrics(mwrs []metricWithResource) pmetric.Metrics { diff --git a/processor/filterprocessor/telemetry.go b/processor/filterprocessor/telemetry.go index 30ad4ee36368..dd21cf783ce8 100644 --- a/processor/filterprocessor/telemetry.go +++ b/processor/filterprocessor/telemetry.go @@ -5,7 +5,9 @@ package filterprocessor // import "github.com/open-telemetry/opentelemetry-colle import ( "context" + "fmt" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -13,42 +15,35 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor/internal/metadata" ) -type trigger int - -const ( - triggerMetricDataPointsDropped trigger = iota - triggerLogsDropped - triggerSpansDropped -) - -type filterProcessorTelemetry struct { - exportCtx context.Context - - processorAttr []attribute.KeyValue - - telemetryBuilder *metadata.TelemetryBuilder +type filterTelemetry struct { + attr metric.MeasurementOption + counter metric.Int64Counter } -func newfilterProcessorTelemetry(set processor.Settings) (*filterProcessorTelemetry, error) { +func newFilterTelemetry(set processor.Settings, signal pipeline.Signal) (*filterTelemetry, error) { telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) if err != nil { return nil, err } - return &filterProcessorTelemetry{ - processorAttr: []attribute.KeyValue{attribute.String(metadata.Type.String(), set.ID.String())}, - exportCtx: context.Background(), - telemetryBuilder: telemetryBuilder, + var counter metric.Int64Counter + switch signal { + case pipeline.SignalMetrics: + counter = telemetryBuilder.ProcessorFilterDatapointsFiltered + case pipeline.SignalLogs: + counter = telemetryBuilder.ProcessorFilterLogsFiltered + case pipeline.SignalTraces: + counter = telemetryBuilder.ProcessorFilterSpansFiltered + default: + return nil, fmt.Errorf("unsupported signal type: %v", signal) + } + + return &filterTelemetry{ + attr: metric.WithAttributeSet(attribute.NewSet(attribute.String(metadata.Type.String(), set.ID.String()))), + counter: counter, }, nil } -func (fpt *filterProcessorTelemetry) record(trigger trigger, dropped int64) { - switch trigger { - case triggerMetricDataPointsDropped: - fpt.telemetryBuilder.ProcessorFilterDatapointsFiltered.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...)) - case triggerLogsDropped: - fpt.telemetryBuilder.ProcessorFilterLogsFiltered.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...)) - case triggerSpansDropped: - fpt.telemetryBuilder.ProcessorFilterSpansFiltered.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...)) - } +func (fpt *filterTelemetry) record(ctx context.Context, dropped int64) { + fpt.counter.Add(ctx, dropped, fpt.attr) } diff --git a/processor/filterprocessor/traces.go b/processor/filterprocessor/traces.go index 4ca860e638a8..3b444cd5897c 100644 --- a/processor/filterprocessor/traces.go +++ b/processor/filterprocessor/traces.go @@ -8,6 +8,7 @@ import ( "fmt" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" "go.uber.org/multierr" @@ -23,7 +24,7 @@ import ( type filterSpanProcessor struct { skipSpanExpr expr.BoolExpr[ottlspan.TransformContext] skipSpanEventExpr expr.BoolExpr[ottlspanevent.TransformContext] - telemetry *filterProcessorTelemetry + telemetry *filterTelemetry logger *zap.Logger } @@ -33,7 +34,7 @@ func newFilterSpansProcessor(set processor.Settings, cfg *Config) (*filterSpanPr logger: set.Logger, } - fpt, err := newfilterProcessorTelemetry(set) + fpt, err := newFilterTelemetry(set, pipeline.SignalTraces) if err != nil { return nil, fmt.Errorf("error creating filter processor telemetry: %w", err) } @@ -120,7 +121,7 @@ func (fsp *filterSpanProcessor) processTraces(ctx context.Context, td ptrace.Tra }) spanCountAfterFilters := td.SpanCount() - fsp.telemetry.record(triggerSpansDropped, int64(spanCountBeforeFilters-spanCountAfterFilters)) + fsp.telemetry.record(ctx, int64(spanCountBeforeFilters-spanCountAfterFilters)) if errors != nil { fsp.logger.Error("failed processing traces", zap.Error(errors)) diff --git a/processor/filterprocessor/traces_test.go b/processor/filterprocessor/traces_test.go index b23cf4b3c007..bee2b497e2fa 100644 --- a/processor/filterprocessor/traces_test.go +++ b/processor/filterprocessor/traces_test.go @@ -314,6 +314,7 @@ func TestFilterTraceProcessorTelemetry(t *testing.T) { } tel.assertMetrics(t, want) + require.NoError(t, tel.Shutdown(context.Background())) } func constructTraces() ptrace.Traces {