From 0df2648cae639cda18362c9bd3c89994755ef5d5 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Thu, 28 Nov 2024 18:26:06 +0100 Subject: [PATCH] *: review feedback --- .../benchmark_test.go | 143 ++++++++---------- .../internal/testing/sdktest/metrics.go | 11 -- 2 files changed, 61 insertions(+), 93 deletions(-) diff --git a/processor/deltatocumulativeprocessor/benchmark_test.go b/processor/deltatocumulativeprocessor/benchmark_test.go index fd71734ddd39..cfe6e5146c8d 100644 --- a/processor/deltatocumulativeprocessor/benchmark_test.go +++ b/processor/deltatocumulativeprocessor/benchmark_test.go @@ -11,7 +11,6 @@ import ( "time" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -19,7 +18,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/histo" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/sdktest" ) var out *consumertest.MetricsSink @@ -30,44 +28,15 @@ func BenchmarkProcessor(gb *testing.B) { streams = 10 ) + now := time.Now() + start := pcommon.NewTimestampFromTime(now) + ts := pcommon.NewTimestampFromTime(now.Add(time.Minute)) + type Case struct { name string fill func(m pmetric.Metric) next func(m pmetric.Metric) } - - run := func(b *testing.B, proc consumer.Metrics, cs Case) { - md := pmetric.NewMetrics() - ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() - for i := range metrics { - m := ms.AppendEmpty() - m.SetName(strconv.Itoa(i)) - cs.fill(m) - } - - b.ReportAllocs() - b.ResetTimer() - b.StopTimer() - - ctx := context.Background() - for range b.N { - for i := range ms.Len() { - cs.next(ms.At(i)) - } - req := pmetric.NewMetrics() - md.CopyTo(req) - - b.StartTimer() - err := proc.ConsumeMetrics(ctx, req) - b.StopTimer() - require.NoError(b, err) - } - } - - now := time.Now() - start := pcommon.NewTimestampFromTime(now) - ts := pcommon.NewTimestampFromTime(now.Add(time.Minute)) - cases := []Case{{ name: "sums", fill: func(m pmetric.Metric) { @@ -81,7 +50,16 @@ func BenchmarkProcessor(gb *testing.B) { dp.SetTimestamp(ts) } }, - next: next(pmetric.Metric.Sum), + next: func(m pmetric.Metric) { + dps := m.Sum().DataPoints() + for i := range dps.Len() { + dp := dps.At(i) + dp.SetStartTimestamp(dp.Timestamp()) + dp.SetTimestamp(pcommon.NewTimestampFromTime( + dp.Timestamp().AsTime().Add(time.Minute), + )) + } + }, }, { name: "histogram", fill: func(m pmetric.Metric) { @@ -101,7 +79,16 @@ func BenchmarkProcessor(gb *testing.B) { dp.Attributes().PutStr("idx", strconv.Itoa(i)) } }, - next: next(pmetric.Metric.Histogram), + next: func(m pmetric.Metric) { + dps := m.Histogram().DataPoints() + for i := range dps.Len() { + dp := dps.At(i) + dp.SetStartTimestamp(dp.Timestamp()) + dp.SetTimestamp(pcommon.NewTimestampFromTime( + dp.Timestamp().AsTime().Add(time.Minute), + )) + } + }, }, { name: "exponential", fill: func(m pmetric.Metric) { @@ -123,60 +110,52 @@ func BenchmarkProcessor(gb *testing.B) { dp.Attributes().PutStr("idx", strconv.Itoa(i)) } }, - next: next(pmetric.Metric.ExponentialHistogram), + next: func(m pmetric.Metric) { + dps := m.ExponentialHistogram().DataPoints() + for i := range dps.Len() { + dp := dps.At(i) + dp.SetStartTimestamp(dp.Timestamp()) + dp.SetTimestamp(pcommon.NewTimestampFromTime( + dp.Timestamp().AsTime().Add(time.Minute), + )) + } + }, }} - tel := func(n int) sdktest.Spec { - total := int64(n * metrics * streams) - tracked := int64(metrics * streams) - return sdktest.Expect(map[string]sdktest.Metric{ - "otelcol_deltatocumulative.datapoints.linear": { - Type: sdktest.TypeSum, - Numbers: []sdktest.Number{{Int: &total}}, - Monotonic: true, - }, - "otelcol_deltatocumulative.streams.tracked.linear": { - Type: sdktest.TypeSum, - Numbers: []sdktest.Number{{Int: &tracked}}, - }, - }) - } - for _, cs := range cases { gb.Run(cs.name, func(b *testing.B) { st := setup(b, nil) out = st.sink - run(b, st.proc, cs) - // verify all dps are processed without error + md := pmetric.NewMetrics() + ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + for i := range metrics { + m := ms.AppendEmpty() + m.SetName(strconv.Itoa(i)) + cs.fill(m) + } + + b.ReportAllocs() + b.ResetTimer() b.StopTimer() - if err := sdktest.Test(tel(b.N), st.tel.reader); err != nil { - b.Fatal(err) + + ctx := context.Background() + for range b.N { + for i := range ms.Len() { + cs.next(ms.At(i)) + } + req := pmetric.NewMetrics() + md.CopyTo(req) + + b.StartTimer() + err := st.proc.ConsumeMetrics(ctx, req) + b.StopTimer() + require.NoError(b, err) } - }) - } -} -func next[ - T interface{ DataPoints() Ps }, - Ps interface { - At(int) P - Len() int - }, - P interface { - Timestamp() pcommon.Timestamp - SetStartTimestamp(pcommon.Timestamp) - SetTimestamp(pcommon.Timestamp) - }, -](sel func(pmetric.Metric) T) func(m pmetric.Metric) { - return func(m pmetric.Metric) { - dps := sel(m).DataPoints() - for i := range dps.Len() { - dp := dps.At(i) - dp.SetStartTimestamp(dp.Timestamp()) - dp.SetTimestamp(pcommon.NewTimestampFromTime( - dp.Timestamp().AsTime().Add(time.Minute), - )) - } + // verify all dps are processed without error + b.StopTimer() + require.Equal(b, b.N*metrics*streams, st.sink.DataPointCount()) + }) } } diff --git a/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go b/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go index e2c5a77d293a..440d094ba274 100644 --- a/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go @@ -123,14 +123,3 @@ func (attr attributes) Into() attribute.Set { // // Temporality is optional and defaults to [sdk.CumulativeTemporality] type Format = []byte - -func Expect(metrics map[string]Metric) Spec { - for name, m := range metrics { - m.Name = name - if m.Temporality == 0 { - m.Temporality = sdk.CumulativeTemporality - } - metrics[name] = m - } - return metrics -}