From ea7270cabaf972cb328116b5881be087117f8ff3 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Wed, 19 Jun 2024 05:15:42 -0500 Subject: [PATCH] Add "inserted" metrics to processors (#10372) #### Link to tracking issue Resolves #10353 #### Testing Added equivalent testing to other processor metrics (accepted, refused, dropped). #### Documentation Metric documentation is autogenerated. #### Open Question My initial implementation includes a breaking change to `componenttest.TestTelemetry` which is public facing API. If we want to avoid an immediate breaking change in this test package, I would propose the following, which I can submit in a prerequisite PR: 1. Deprecate all `TestTelemetry.Check*` methods. 2. Replace with more granular `TestTelemetry.CheckOneSpecificMetric` methods. --- .chloggen/processorhelper-inserted-api.yaml | 25 ++++ .chloggen/processorhelper-inserted.yaml | 29 +++++ component/componenttest/obsreporttest.go | 12 +- .../componenttest/otelprometheuschecker.go | 18 +-- .../otelprometheuschecker_test.go | 6 +- .../testdata/prometheus_response | 9 ++ processor/processorhelper/documentation.md | 24 ++++ .../internal/metadata/generated_telemetry.go | 21 ++++ processor/processorhelper/metadata.yaml | 26 ++++- processor/processorhelper/obsreport.go | 41 +++++-- processor/processorhelper/obsreport_test.go | 109 ++++++++++++------ 11 files changed, 258 insertions(+), 62 deletions(-) create mode 100644 .chloggen/processorhelper-inserted-api.yaml create mode 100644 .chloggen/processorhelper-inserted.yaml diff --git a/.chloggen/processorhelper-inserted-api.yaml b/.chloggen/processorhelper-inserted-api.yaml new file mode 100644 index 00000000000..4ce25669388 --- /dev/null +++ b/.chloggen/processorhelper-inserted-api.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: component/componenttest + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added additional "inserted" count to `TestTelemetry.CheckProcessor*` methods. + +# One or more tracking issues or pull requests related to the change +issues: [10353] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/processorhelper-inserted.yaml b/.chloggen/processorhelper-inserted.yaml new file mode 100644 index 00000000000..7ed6de7e864 --- /dev/null +++ b/.chloggen/processorhelper-inserted.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processorhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add "inserted" metrics for processors. + +# One or more tracking issues or pull requests related to the change +issues: [10353] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This includes the following metrics for processors: + - `processor_inserted_spans` + - `processor_inserted_metric_points` + - `processor_inserted_log_records` + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/component/componenttest/obsreporttest.go b/component/componenttest/obsreporttest.go index f342879a533..ba076c65905 100644 --- a/component/componenttest/obsreporttest.go +++ b/component/componenttest/obsreporttest.go @@ -78,20 +78,20 @@ func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64) err // CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values. // When this function is called it is required to also call SetupTelemetry as first thing. -func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans int64) error { - return tts.prometheusChecker.checkProcessorTraces(tts.id, acceptedSpans, refusedSpans, droppedSpans) +func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans, insertedSpans int64) error { + return tts.prometheusChecker.checkProcessorTraces(tts.id, acceptedSpans, refusedSpans, droppedSpans, insertedSpans) } // CheckProcessorMetrics checks that for the current exported values for metrics exporter metrics match given values. // When this function is called it is required to also call SetupTelemetry as first thing. -func (tts *TestTelemetry) CheckProcessorMetrics(acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error { - return tts.prometheusChecker.checkProcessorMetrics(tts.id, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints) +func (tts *TestTelemetry) CheckProcessorMetrics(acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints, insertedMetricPoints int64) error { + return tts.prometheusChecker.checkProcessorMetrics(tts.id, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints, insertedMetricPoints) } // CheckProcessorLogs checks that for the current exported values for logs exporter metrics match given values. // When this function is called it is required to also call SetupTelemetry as first thing. -func (tts *TestTelemetry) CheckProcessorLogs(acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error { - return tts.prometheusChecker.checkProcessorLogs(tts.id, acceptedLogRecords, refusedLogRecords, droppedLogRecords) +func (tts *TestTelemetry) CheckProcessorLogs(acceptedLogRecords, refusedLogRecords, droppedLogRecords, insertedLogRecords int64) error { + return tts.prometheusChecker.checkProcessorLogs(tts.id, acceptedLogRecords, refusedLogRecords, droppedLogRecords, insertedLogRecords) } // CheckReceiverTraces checks that for the current exported values for trace receiver metrics match given values. diff --git a/component/componenttest/otelprometheuschecker.go b/component/componenttest/otelprometheuschecker.go index 5beef52373a..9dbe60384aa 100644 --- a/component/componenttest/otelprometheuschecker.go +++ b/component/componenttest/otelprometheuschecker.go @@ -48,24 +48,26 @@ func (pc *prometheusChecker) checkReceiver(receiver component.ID, datatype, prot pc.checkCounter(fmt.Sprintf("receiver_refused_%s", datatype), droppedMetricPoints, receiverAttrs)) } -func (pc *prometheusChecker) checkProcessorTraces(processor component.ID, accepted, refused, dropped int64) error { - return pc.checkProcessor(processor, "spans", accepted, refused, dropped) +func (pc *prometheusChecker) checkProcessorTraces(processor component.ID, accepted, refused, dropped, inserted int64) error { + return pc.checkProcessor(processor, "spans", accepted, refused, dropped, inserted) } -func (pc *prometheusChecker) checkProcessorMetrics(processor component.ID, accepted, refused, dropped int64) error { - return pc.checkProcessor(processor, "metric_points", accepted, refused, dropped) +func (pc *prometheusChecker) checkProcessorMetrics(processor component.ID, accepted, refused, dropped, inserted int64) error { + return pc.checkProcessor(processor, "metric_points", accepted, refused, dropped, inserted) } -func (pc *prometheusChecker) checkProcessorLogs(processor component.ID, accepted, refused, dropped int64) error { - return pc.checkProcessor(processor, "log_records", accepted, refused, dropped) +func (pc *prometheusChecker) checkProcessorLogs(processor component.ID, accepted, refused, dropped, inserted int64) error { + return pc.checkProcessor(processor, "log_records", accepted, refused, dropped, inserted) } -func (pc *prometheusChecker) checkProcessor(processor component.ID, datatype string, accepted, refused, dropped int64) error { +func (pc *prometheusChecker) checkProcessor(processor component.ID, datatype string, accepted, refused, dropped, inserted int64) error { processorAttrs := attributesForProcessorMetrics(processor) return multierr.Combine( pc.checkCounter(fmt.Sprintf("processor_accepted_%s", datatype), accepted, processorAttrs), pc.checkCounter(fmt.Sprintf("processor_refused_%s", datatype), refused, processorAttrs), - pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, processorAttrs)) + pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, processorAttrs), + pc.checkCounter(fmt.Sprintf("processor_inserted_%s", datatype), inserted, processorAttrs), + ) } func (pc *prometheusChecker) checkExporterTraces(exporter component.ID, sent, sendFailed int64) error { diff --git a/component/componenttest/otelprometheuschecker_test.go b/component/componenttest/otelprometheuschecker_test.go index d7176134f54..a2efb109355 100644 --- a/component/componenttest/otelprometheuschecker_test.go +++ b/component/componenttest/otelprometheuschecker_test.go @@ -88,17 +88,17 @@ func TestPromChecker(t *testing.T) { ) assert.NoError(t, - pc.checkProcessorTraces(processor, 42, 13, 7), + pc.checkProcessorTraces(processor, 42, 13, 7, 5), "metrics from Receiver Traces should be valid", ) assert.NoError(t, - pc.checkProcessorMetrics(processor, 7, 41, 13), + pc.checkProcessorMetrics(processor, 7, 41, 13, 4), "metrics from Receiver Metrics should be valid", ) assert.NoError(t, - pc.checkProcessorLogs(processor, 102, 35, 14), + pc.checkProcessorLogs(processor, 102, 35, 14, 3), "metrics from Receiver Logs should be valid", ) diff --git a/component/componenttest/testdata/prometheus_response b/component/componenttest/testdata/prometheus_response index fb954905988..09f1af71f46 100644 --- a/component/componenttest/testdata/prometheus_response +++ b/component/componenttest/testdata/prometheus_response @@ -25,6 +25,9 @@ processor_refused_spans{processor="fakeProcessor"} 13 # HELP processor_dropped_spans Number of spans that were dropped. # TYPE processor_dropped_spans counter processor_dropped_spans{processor="fakeProcessor"} 7 +# HELP processor_inserted_spans Number of spans that were inserted. +# TYPE processor_inserted_spans counter +processor_inserted_spans{processor="fakeProcessor"} 5 # HELP processor_accepted_metric_points Number of metric points successfully pushed into the next component in the pipeline. # TYPE processor_accepted_metric_points counter processor_accepted_metric_points{processor="fakeProcessor"} 7 @@ -34,6 +37,9 @@ processor_refused_metric_points{processor="fakeProcessor"} 41 # HELP processor_dropped_metric_points Number of metric points that were dropped. # TYPE processor_dropped_metric_points counter processor_dropped_metric_points{processor="fakeProcessor"} 13 +# HELP processor_inserted_metric_points Number of metric points that were inserted. +# TYPE processor_inserted_metric_points counter +processor_inserted_metric_points{processor="fakeProcessor"} 4 # HELP processor_accepted_log_records Number of log records successfully pushed into the next component in the pipeline. # TYPE processor_accepted_log_records counter processor_accepted_log_records{processor="fakeProcessor"} 102 @@ -43,6 +49,9 @@ processor_refused_log_records{processor="fakeProcessor"} 35 # HELP processor_dropped_log_records Number of log records that were dropped. # TYPE processor_dropped_log_records counter processor_dropped_log_records{processor="fakeProcessor"} 14 +# HELP processor_inserted_log_records Number of log records that were inserted. +# TYPE processor_inserted_log_records counter +processor_inserted_log_records{processor="fakeProcessor"} 3 # HELP receiver_accepted_log_records Number of log records successfully pushed into the pipeline. # TYPE receiver_accepted_log_records counter receiver_accepted_log_records{receiver="fakeReceiver",transport="fakeTransport"} 102 diff --git a/processor/processorhelper/documentation.md b/processor/processorhelper/documentation.md index 073fc9a7219..bdd45b6ff18 100644 --- a/processor/processorhelper/documentation.md +++ b/processor/processorhelper/documentation.md @@ -54,6 +54,30 @@ Number of spans that were dropped. | ---- | ----------- | ---------- | --------- | | 1 | Sum | Int | true | +### processor_inserted_log_records + +Number of log records that were inserted. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### processor_inserted_metric_points + +Number of metric points that were inserted. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### processor_inserted_spans + +Number of spans that were inserted. + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + ### processor_refused_log_records Number of log records that were rejected by the next component in the pipeline. diff --git a/processor/processorhelper/internal/metadata/generated_telemetry.go b/processor/processorhelper/internal/metadata/generated_telemetry.go index 94d738f4a63..df28bde4d58 100644 --- a/processor/processorhelper/internal/metadata/generated_telemetry.go +++ b/processor/processorhelper/internal/metadata/generated_telemetry.go @@ -31,6 +31,9 @@ type TelemetryBuilder struct { ProcessorDroppedLogRecords metric.Int64Counter ProcessorDroppedMetricPoints metric.Int64Counter ProcessorDroppedSpans metric.Int64Counter + ProcessorInsertedLogRecords metric.Int64Counter + ProcessorInsertedMetricPoints metric.Int64Counter + ProcessorInsertedSpans metric.Int64Counter ProcessorRefusedLogRecords metric.Int64Counter ProcessorRefusedMetricPoints metric.Int64Counter ProcessorRefusedSpans metric.Int64Counter @@ -96,6 +99,24 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...teleme metric.WithUnit("1"), ) errs = errors.Join(errs, err) + builder.ProcessorInsertedLogRecords, err = builder.meter.Int64Counter( + "processor_inserted_log_records", + metric.WithDescription("Number of log records that were inserted."), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorInsertedMetricPoints, err = builder.meter.Int64Counter( + "processor_inserted_metric_points", + metric.WithDescription("Number of metric points that were inserted."), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorInsertedSpans, err = builder.meter.Int64Counter( + "processor_inserted_spans", + metric.WithDescription("Number of spans that were inserted."), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) builder.ProcessorRefusedLogRecords, err = builder.meter.Int64Counter( "processor_refused_log_records", metric.WithDescription("Number of log records that were rejected by the next component in the pipeline."), diff --git a/processor/processorhelper/metadata.yaml b/processor/processorhelper/metadata.yaml index 7e0c72f5c9c..2b6ec764d1e 100644 --- a/processor/processorhelper/metadata.yaml +++ b/processor/processorhelper/metadata.yaml @@ -33,6 +33,14 @@ telemetry: value_type: int monotonic: true + processor_inserted_spans: + enabled: true + description: Number of spans that were inserted. + unit: 1 + sum: + value_type: int + monotonic: true + processor_accepted_metric_points: enabled: true description: Number of metric points successfully pushed into the next component in the pipeline. @@ -57,6 +65,14 @@ telemetry: value_type: int monotonic: true + processor_inserted_metric_points: + enabled: true + description: Number of metric points that were inserted. + unit: 1 + sum: + value_type: int + monotonic: true + processor_accepted_log_records: enabled: true description: Number of log records successfully pushed into the next component in the pipeline. @@ -79,4 +95,12 @@ telemetry: unit: 1 sum: value_type: int - monotonic: true \ No newline at end of file + monotonic: true + + processor_inserted_log_records: + enabled: true + description: Number of log records that were inserted. + unit: 1 + sum: + value_type: int + monotonic: true diff --git a/processor/processorhelper/obsreport.go b/processor/processorhelper/obsreport.go index ad46303da6b..2a3bd0dc754 100644 --- a/processor/processorhelper/obsreport.go +++ b/processor/processorhelper/obsreport.go @@ -60,69 +60,88 @@ func newObsReport(cfg ObsReportSettings) (*ObsReport, error) { }, nil } -func (or *ObsReport) recordData(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) { - var acceptedCount, refusedCount, droppedCount metric.Int64Counter +func (or *ObsReport) recordData(ctx context.Context, dataType component.DataType, accepted, refused, dropped, inserted int64) { + var acceptedCount, refusedCount, droppedCount, insertedCount metric.Int64Counter switch dataType { case component.DataTypeTraces: acceptedCount = or.telemetryBuilder.ProcessorAcceptedSpans refusedCount = or.telemetryBuilder.ProcessorRefusedSpans droppedCount = or.telemetryBuilder.ProcessorDroppedSpans + insertedCount = or.telemetryBuilder.ProcessorInsertedSpans case component.DataTypeMetrics: acceptedCount = or.telemetryBuilder.ProcessorAcceptedMetricPoints refusedCount = or.telemetryBuilder.ProcessorRefusedMetricPoints droppedCount = or.telemetryBuilder.ProcessorDroppedMetricPoints + insertedCount = or.telemetryBuilder.ProcessorInsertedMetricPoints case component.DataTypeLogs: acceptedCount = or.telemetryBuilder.ProcessorAcceptedLogRecords refusedCount = or.telemetryBuilder.ProcessorRefusedLogRecords droppedCount = or.telemetryBuilder.ProcessorDroppedLogRecords + insertedCount = or.telemetryBuilder.ProcessorInsertedLogRecords } acceptedCount.Add(ctx, accepted, metric.WithAttributes(or.otelAttrs...)) refusedCount.Add(ctx, refused, metric.WithAttributes(or.otelAttrs...)) droppedCount.Add(ctx, dropped, metric.WithAttributes(or.otelAttrs...)) + insertedCount.Add(ctx, inserted, metric.WithAttributes(or.otelAttrs...)) } // TracesAccepted reports that the trace data was accepted. func (or *ObsReport) TracesAccepted(ctx context.Context, numSpans int) { - or.recordData(ctx, component.DataTypeTraces, int64(numSpans), int64(0), int64(0)) + or.recordData(ctx, component.DataTypeTraces, int64(numSpans), int64(0), int64(0), int64(0)) } // TracesRefused reports that the trace data was refused. func (or *ObsReport) TracesRefused(ctx context.Context, numSpans int) { - or.recordData(ctx, component.DataTypeTraces, int64(0), int64(numSpans), int64(0)) + or.recordData(ctx, component.DataTypeTraces, int64(0), int64(numSpans), int64(0), int64(0)) } // TracesDropped reports that the trace data was dropped. func (or *ObsReport) TracesDropped(ctx context.Context, numSpans int) { - or.recordData(ctx, component.DataTypeTraces, int64(0), int64(0), int64(numSpans)) + or.recordData(ctx, component.DataTypeTraces, int64(0), int64(0), int64(numSpans), int64(0)) +} + +// TracesInserted reports that the trace data was inserted. +func (or *ObsReport) TracesInserted(ctx context.Context, numSpans int) { + or.recordData(ctx, component.DataTypeTraces, int64(0), int64(0), int64(0), int64(numSpans)) } // MetricsAccepted reports that the metrics were accepted. func (or *ObsReport) MetricsAccepted(ctx context.Context, numPoints int) { - or.recordData(ctx, component.DataTypeMetrics, int64(numPoints), int64(0), int64(0)) + or.recordData(ctx, component.DataTypeMetrics, int64(numPoints), int64(0), int64(0), int64(0)) } // MetricsRefused reports that the metrics were refused. func (or *ObsReport) MetricsRefused(ctx context.Context, numPoints int) { - or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(numPoints), int64(0)) + or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(numPoints), int64(0), int64(0)) } // MetricsDropped reports that the metrics were dropped. func (or *ObsReport) MetricsDropped(ctx context.Context, numPoints int) { - or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(0), int64(numPoints)) + or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(0), int64(numPoints), int64(0)) +} + +// MetricsInserted reports that the metrics were inserted. +func (or *ObsReport) MetricsInserted(ctx context.Context, numPoints int) { + or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(0), int64(0), int64(numPoints)) } // LogsAccepted reports that the logs were accepted. func (or *ObsReport) LogsAccepted(ctx context.Context, numRecords int) { - or.recordData(ctx, component.DataTypeLogs, int64(numRecords), int64(0), int64(0)) + or.recordData(ctx, component.DataTypeLogs, int64(numRecords), int64(0), int64(0), int64(0)) } // LogsRefused reports that the logs were refused. func (or *ObsReport) LogsRefused(ctx context.Context, numRecords int) { - or.recordData(ctx, component.DataTypeLogs, int64(0), int64(numRecords), int64(0)) + or.recordData(ctx, component.DataTypeLogs, int64(0), int64(numRecords), int64(0), int64(0)) } // LogsDropped reports that the logs were dropped. func (or *ObsReport) LogsDropped(ctx context.Context, numRecords int) { - or.recordData(ctx, component.DataTypeLogs, int64(0), int64(0), int64(numRecords)) + or.recordData(ctx, component.DataTypeLogs, int64(0), int64(0), int64(numRecords), int64(0)) +} + +// LogsInserted reports that the logs were inserted. +func (or *ObsReport) LogsInserted(ctx context.Context, numRecords int) { + or.recordData(ctx, component.DataTypeLogs, int64(0), int64(0), int64(0), int64(numRecords)) } diff --git a/processor/processorhelper/obsreport_test.go b/processor/processorhelper/obsreport_test.go index 52da51007a4..57ae4d04184 100644 --- a/processor/processorhelper/obsreport_test.go +++ b/processor/processorhelper/obsreport_test.go @@ -25,6 +25,8 @@ func TestProcessorTraceData(t *testing.T) { const acceptedSpans = 27 const refusedSpans = 19 const droppedSpans = 13 + const insertedSpans = 5 + obsrep, err := newObsReport(ObsReportSettings{ ProcessorID: processorID, ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, @@ -33,8 +35,9 @@ func TestProcessorTraceData(t *testing.T) { obsrep.TracesAccepted(context.Background(), acceptedSpans) obsrep.TracesRefused(context.Background(), refusedSpans) obsrep.TracesDropped(context.Background(), droppedSpans) + obsrep.TracesInserted(context.Background(), insertedSpans) - require.NoError(t, tt.CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans)) + require.NoError(t, tt.CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans, insertedSpans)) }) } @@ -43,6 +46,7 @@ func TestProcessorMetricsData(t *testing.T) { const acceptedPoints = 29 const refusedPoints = 11 const droppedPoints = 17 + const insertedPoints = 4 obsrep, err := newObsReport(ObsReportSettings{ ProcessorID: processorID, @@ -52,8 +56,9 @@ func TestProcessorMetricsData(t *testing.T) { obsrep.MetricsAccepted(context.Background(), acceptedPoints) obsrep.MetricsRefused(context.Background(), refusedPoints) obsrep.MetricsDropped(context.Background(), droppedPoints) + obsrep.MetricsInserted(context.Background(), insertedPoints) - require.NoError(t, tt.CheckProcessorMetrics(acceptedPoints, refusedPoints, droppedPoints)) + require.NoError(t, tt.CheckProcessorMetrics(acceptedPoints, refusedPoints, droppedPoints, insertedPoints)) }) } @@ -84,6 +89,7 @@ func TestProcessorLogRecords(t *testing.T) { const acceptedRecords = 29 const refusedRecords = 11 const droppedRecords = 17 + const insertedRecords = 3 obsrep, err := newObsReport(ObsReportSettings{ ProcessorID: processorID, @@ -93,8 +99,9 @@ func TestProcessorLogRecords(t *testing.T) { obsrep.LogsAccepted(context.Background(), acceptedRecords) obsrep.LogsRefused(context.Background(), refusedRecords) obsrep.LogsDropped(context.Background(), droppedRecords) + obsrep.LogsInserted(context.Background(), insertedRecords) - require.NoError(t, tt.CheckProcessorLogs(acceptedRecords, refusedRecords, droppedRecords)) + require.NoError(t, tt.CheckProcessorLogs(acceptedRecords, refusedRecords, droppedRecords, insertedRecords)) }) } @@ -112,15 +119,24 @@ func TestCheckProcessorTracesViews(t *testing.T) { por.TracesAccepted(context.Background(), 7) por.TracesRefused(context.Background(), 8) por.TracesDropped(context.Background(), 9) - - assert.NoError(t, tt.CheckProcessorTraces(7, 8, 9)) - assert.Error(t, tt.CheckProcessorTraces(0, 0, 0)) - assert.Error(t, tt.CheckProcessorTraces(7, 0, 0)) - assert.Error(t, tt.CheckProcessorTraces(7, 8, 0)) - assert.Error(t, tt.CheckProcessorTraces(7, 0, 9)) - assert.Error(t, tt.CheckProcessorTraces(0, 8, 0)) - assert.Error(t, tt.CheckProcessorTraces(0, 8, 9)) - assert.Error(t, tt.CheckProcessorTraces(0, 0, 9)) + por.TracesInserted(context.Background(), 10) + + assert.NoError(t, tt.CheckProcessorTraces(7, 8, 9, 10)) + assert.Error(t, tt.CheckProcessorTraces(0, 0, 0, 0)) + assert.Error(t, tt.CheckProcessorTraces(7, 0, 0, 0)) + assert.Error(t, tt.CheckProcessorTraces(0, 8, 0, 0)) + assert.Error(t, tt.CheckProcessorTraces(0, 0, 9, 0)) + assert.Error(t, tt.CheckProcessorTraces(0, 0, 0, 10)) + assert.Error(t, tt.CheckProcessorTraces(7, 8, 0, 0)) + assert.Error(t, tt.CheckProcessorTraces(7, 0, 9, 0)) + assert.Error(t, tt.CheckProcessorTraces(7, 0, 0, 10)) + assert.Error(t, tt.CheckProcessorTraces(0, 8, 9, 0)) + assert.Error(t, tt.CheckProcessorTraces(0, 8, 0, 10)) + assert.Error(t, tt.CheckProcessorTraces(0, 0, 9, 10)) + assert.Error(t, tt.CheckProcessorTraces(7, 8, 9, 0)) + assert.Error(t, tt.CheckProcessorTraces(7, 8, 0, 10)) + assert.Error(t, tt.CheckProcessorTraces(7, 0, 9, 10)) + assert.Error(t, tt.CheckProcessorTraces(0, 8, 9, 10)) } func TestCheckProcessorMetricsViews(t *testing.T) { @@ -137,15 +153,24 @@ func TestCheckProcessorMetricsViews(t *testing.T) { por.MetricsAccepted(context.Background(), 7) por.MetricsRefused(context.Background(), 8) por.MetricsDropped(context.Background(), 9) - - assert.NoError(t, tt.CheckProcessorMetrics(7, 8, 9)) - assert.Error(t, tt.CheckProcessorMetrics(0, 0, 0)) - assert.Error(t, tt.CheckProcessorMetrics(7, 0, 0)) - assert.Error(t, tt.CheckProcessorMetrics(7, 8, 0)) - assert.Error(t, tt.CheckProcessorMetrics(7, 0, 9)) - assert.Error(t, tt.CheckProcessorMetrics(0, 8, 0)) - assert.Error(t, tt.CheckProcessorMetrics(0, 8, 9)) - assert.Error(t, tt.CheckProcessorMetrics(0, 0, 9)) + por.MetricsInserted(context.Background(), 10) + + assert.NoError(t, tt.CheckProcessorMetrics(7, 8, 9, 10)) + assert.Error(t, tt.CheckProcessorMetrics(0, 0, 0, 0)) + assert.Error(t, tt.CheckProcessorMetrics(7, 0, 0, 0)) + assert.Error(t, tt.CheckProcessorMetrics(0, 8, 0, 0)) + assert.Error(t, tt.CheckProcessorMetrics(0, 0, 9, 0)) + assert.Error(t, tt.CheckProcessorMetrics(0, 0, 0, 10)) + assert.Error(t, tt.CheckProcessorMetrics(7, 8, 0, 0)) + assert.Error(t, tt.CheckProcessorMetrics(7, 0, 9, 0)) + assert.Error(t, tt.CheckProcessorMetrics(7, 0, 0, 10)) + assert.Error(t, tt.CheckProcessorMetrics(0, 8, 9, 0)) + assert.Error(t, tt.CheckProcessorMetrics(0, 8, 0, 10)) + assert.Error(t, tt.CheckProcessorMetrics(0, 0, 9, 10)) + assert.Error(t, tt.CheckProcessorMetrics(7, 8, 9, 0)) + assert.Error(t, tt.CheckProcessorMetrics(7, 8, 0, 10)) + assert.Error(t, tt.CheckProcessorMetrics(7, 0, 9, 10)) + assert.Error(t, tt.CheckProcessorMetrics(0, 8, 9, 10)) } func TestCheckProcessorLogViews(t *testing.T) { @@ -162,15 +187,24 @@ func TestCheckProcessorLogViews(t *testing.T) { por.LogsAccepted(context.Background(), 7) por.LogsRefused(context.Background(), 8) por.LogsDropped(context.Background(), 9) - - assert.NoError(t, tt.CheckProcessorLogs(7, 8, 9)) - assert.Error(t, tt.CheckProcessorLogs(0, 0, 0)) - assert.Error(t, tt.CheckProcessorLogs(7, 0, 0)) - assert.Error(t, tt.CheckProcessorLogs(7, 8, 0)) - assert.Error(t, tt.CheckProcessorLogs(7, 0, 9)) - assert.Error(t, tt.CheckProcessorLogs(0, 8, 0)) - assert.Error(t, tt.CheckProcessorLogs(0, 8, 9)) - assert.Error(t, tt.CheckProcessorLogs(0, 0, 9)) + por.LogsInserted(context.Background(), 10) + + assert.NoError(t, tt.CheckProcessorLogs(7, 8, 9, 10)) + assert.Error(t, tt.CheckProcessorLogs(0, 0, 0, 0)) + assert.Error(t, tt.CheckProcessorLogs(7, 0, 0, 0)) + assert.Error(t, tt.CheckProcessorLogs(0, 8, 0, 0)) + assert.Error(t, tt.CheckProcessorLogs(0, 0, 9, 0)) + assert.Error(t, tt.CheckProcessorLogs(0, 0, 0, 10)) + assert.Error(t, tt.CheckProcessorLogs(7, 8, 0, 0)) + assert.Error(t, tt.CheckProcessorLogs(7, 0, 9, 0)) + assert.Error(t, tt.CheckProcessorLogs(7, 0, 0, 10)) + assert.Error(t, tt.CheckProcessorLogs(0, 8, 9, 0)) + assert.Error(t, tt.CheckProcessorLogs(0, 8, 0, 10)) + assert.Error(t, tt.CheckProcessorLogs(0, 0, 9, 10)) + assert.Error(t, tt.CheckProcessorLogs(7, 8, 9, 0)) + assert.Error(t, tt.CheckProcessorLogs(7, 8, 0, 10)) + assert.Error(t, tt.CheckProcessorLogs(7, 0, 9, 10)) + assert.Error(t, tt.CheckProcessorLogs(0, 8, 9, 10)) } func TestNoMetrics(t *testing.T) { @@ -179,6 +213,8 @@ func TestNoMetrics(t *testing.T) { const accepted = 29 const refused = 11 const dropped = 17 + const inserted = 5 + set := tt.TelemetrySettings() set.MetricsLevel = configtelemetry.LevelNone @@ -191,13 +227,16 @@ func TestNoMetrics(t *testing.T) { por.TracesAccepted(context.Background(), accepted) por.TracesRefused(context.Background(), refused) por.TracesDropped(context.Background(), dropped) + por.TracesInserted(context.Background(), inserted) - require.Error(t, tt.CheckProcessorTraces(accepted, refused, dropped)) + require.Error(t, tt.CheckProcessorTraces(accepted, refused, dropped, inserted)) }) testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { const accepted = 29 const refused = 11 const dropped = 17 + const inserted = 4 + set := tt.TelemetrySettings() set.MetricsLevel = configtelemetry.LevelNone @@ -210,13 +249,16 @@ func TestNoMetrics(t *testing.T) { por.MetricsAccepted(context.Background(), accepted) por.MetricsRefused(context.Background(), refused) por.MetricsDropped(context.Background(), dropped) + por.MetricsInserted(context.Background(), inserted) - require.Error(t, tt.CheckProcessorMetrics(accepted, refused, dropped)) + require.Error(t, tt.CheckProcessorMetrics(accepted, refused, dropped, inserted)) }) testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { const accepted = 29 const refused = 11 const dropped = 17 + const inserted = 3 + set := tt.TelemetrySettings() set.MetricsLevel = configtelemetry.LevelNone @@ -229,8 +271,9 @@ func TestNoMetrics(t *testing.T) { por.LogsAccepted(context.Background(), accepted) por.LogsRefused(context.Background(), refused) por.LogsDropped(context.Background(), dropped) + por.LogsInserted(context.Background(), inserted) - require.Error(t, tt.CheckProcessorLogs(accepted, refused, dropped)) + require.Error(t, tt.CheckProcessorLogs(accepted, refused, dropped, inserted)) }) }