diff --git a/processor/deltatocumulativeprocessor/chain.go b/processor/deltatocumulativeprocessor/chain.go deleted file mode 100644 index 0a39ea8939c7..000000000000 --- a/processor/deltatocumulativeprocessor/chain.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" - -import ( - "context" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/processor" -) - -var _ processor.Metrics = Chain(nil) - -// Chain calls processors in series. -// They must be manually setup so that their ConsumeMetrics() invoke each other -type Chain []processor.Metrics - -func (c Chain) Capabilities() consumer.Capabilities { - if len(c) == 0 { - return consumer.Capabilities{} - } - return c[0].Capabilities() -} - -func (c Chain) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - if len(c) == 0 { - return nil - } - return c[0].ConsumeMetrics(ctx, md) -} - -func (c Chain) Shutdown(ctx context.Context) error { - for _, proc := range c { - if err := proc.Shutdown(ctx); err != nil { - return err - } - } - return nil -} - -func (c Chain) Start(ctx context.Context, host component.Host) error { - for _, proc := range c { - if err := proc.Start(ctx, host); err != nil { - return err - } - } - return nil -} diff --git a/processor/deltatocumulativeprocessor/config.go b/processor/deltatocumulativeprocessor/config.go index 4cb97c91f59b..f5b5c1c59dfa 100644 --- a/processor/deltatocumulativeprocessor/config.go +++ b/processor/deltatocumulativeprocessor/config.go @@ -11,7 +11,7 @@ import ( "go.opentelemetry.io/collector/component" - telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" + telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" ) var _ component.ConfigValidator = (*Config)(nil) diff --git a/processor/deltatocumulativeprocessor/factory.go b/processor/deltatocumulativeprocessor/factory.go index 3f76a4e11b4c..904ae1ee6827 100644 --- a/processor/deltatocumulativeprocessor/factory.go +++ b/processor/deltatocumulativeprocessor/factory.go @@ -11,8 +11,8 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" - ltel "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" ) func NewFactory() processor.Factory { @@ -29,13 +29,10 @@ func createMetricsProcessor(_ context.Context, set processor.Settings, cfg compo return nil, fmt.Errorf("configuration parsing error") } - ltel, err := ltel.New(set.TelemetrySettings) + tel, err := telemetry.New(set.TelemetrySettings) if err != nil { return nil, err } - proc := newProcessor(pcfg, set.Logger, <el.TelemetryBuilder, next) - linear := newLinear(pcfg, ltel, proc) - - return Chain{linear, proc}, nil + return newProcessor(pcfg, tel, next), nil } diff --git a/processor/deltatocumulativeprocessor/go.mod b/processor/deltatocumulativeprocessor/go.mod index 79f8b05353b5..1b6b315467d1 100644 --- a/processor/deltatocumulativeprocessor/go.mod +++ b/processor/deltatocumulativeprocessor/go.mod @@ -21,7 +21,6 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.32.0 go.opentelemetry.io/otel/trace v1.32.0 go.uber.org/goleak v1.3.0 - go.uber.org/zap v1.27.0 golang.org/x/tools v0.26.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -52,6 +51,7 @@ require ( go.opentelemetry.io/collector/processor/processorprofiles v0.114.1-0.20241202231142-b9ff1bc54c99 // indirect go.opentelemetry.io/otel/sdk v1.32.0 // indirect go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/sys v0.27.0 // indirect golang.org/x/text v0.19.0 // indirect diff --git a/processor/deltatocumulativeprocessor/internal/data/data.go b/processor/deltatocumulativeprocessor/internal/data/data.go index 2460af09c1b8..3a36f6d552d2 100644 --- a/processor/deltatocumulativeprocessor/internal/data/data.go +++ b/processor/deltatocumulativeprocessor/internal/data/data.go @@ -4,116 +4,23 @@ package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" import ( - "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" ) -var ( - _ Point[Number] = Number{} - _ Point[Histogram] = Histogram{} - _ Point[ExpHistogram] = ExpHistogram{} - _ Point[Summary] = Summary{} -) - -type Point[Self any] interface { - StartTimestamp() pcommon.Timestamp - Timestamp() pcommon.Timestamp - Attributes() pcommon.Map - - Clone() Self - CopyTo(Self) - - Add(Self) Self -} - -type Typed[Self any] interface { - Point[Self] - Number | Histogram | ExpHistogram | Summary -} - type Number struct { pmetric.NumberDataPoint } -func Zero[P Typed[P]]() P { - var point P - switch ty := any(&point).(type) { - case *Number: - ty.NumberDataPoint = pmetric.NewNumberDataPoint() - case *Histogram: - ty.HistogramDataPoint = pmetric.NewHistogramDataPoint() - case *ExpHistogram: - ty.DataPoint = pmetric.NewExponentialHistogramDataPoint() - } - return point -} - -func (dp Number) Clone() Number { - clone := Number{NumberDataPoint: pmetric.NewNumberDataPoint()} - if dp.NumberDataPoint != (pmetric.NumberDataPoint{}) { - dp.CopyTo(clone) - } - return clone -} - -func (dp Number) CopyTo(dst Number) { - dp.NumberDataPoint.CopyTo(dst.NumberDataPoint) -} - type Histogram struct { pmetric.HistogramDataPoint } -func (dp Histogram) Clone() Histogram { - clone := Histogram{HistogramDataPoint: pmetric.NewHistogramDataPoint()} - if dp.HistogramDataPoint != (pmetric.HistogramDataPoint{}) { - dp.CopyTo(clone) - } - return clone -} - -func (dp Histogram) CopyTo(dst Histogram) { - dp.HistogramDataPoint.CopyTo(dst.HistogramDataPoint) -} - type ExpHistogram struct { expo.DataPoint } -func (dp ExpHistogram) Clone() ExpHistogram { - clone := ExpHistogram{DataPoint: pmetric.NewExponentialHistogramDataPoint()} - if dp.DataPoint != (expo.DataPoint{}) { - dp.CopyTo(clone) - } - return clone -} - -func (dp ExpHistogram) CopyTo(dst ExpHistogram) { - dp.DataPoint.CopyTo(dst.DataPoint) -} - -type mustPoint[D Point[D]] struct{ _ D } - -var ( - _ = mustPoint[Number]{} - _ = mustPoint[Histogram]{} - _ = mustPoint[ExpHistogram]{} -) - type Summary struct { pmetric.SummaryDataPoint } - -func (dp Summary) Clone() Summary { - clone := Summary{SummaryDataPoint: pmetric.NewSummaryDataPoint()} - if dp.SummaryDataPoint != (pmetric.SummaryDataPoint{}) { - dp.CopyTo(clone) - } - return clone -} - -func (dp Summary) CopyTo(dst Summary) { - dp.SummaryDataPoint.CopyTo(dst.SummaryDataPoint) -} diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta.go b/processor/deltatocumulativeprocessor/internal/delta/delta.go index 3320d44f2724..f2a759e9bfe9 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -9,55 +9,9 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" - exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" ) -func New[D data.Point[D]]() Accumulator[D] { - return Accumulator[D]{ - Map: make(exp.HashMap[D]), - } -} - -var _ streams.Map[data.Number] = (*Accumulator[data.Number])(nil) - -type Accumulator[D data.Point[D]] struct { - streams.Map[D] -} - -func (a Accumulator[D]) Store(id streams.Ident, dp D) error { - aggr, ok := a.Map.Load(id) - - // new series: initialize with current sample - if !ok { - clone := dp.Clone() - return a.Map.Store(id, clone) - } - - // drop bad samples - switch { - case dp.StartTimestamp() < aggr.StartTimestamp(): - // belongs to older series - return ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()} - case dp.Timestamp() <= aggr.Timestamp(): - // out of order - return ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()} - } - - // detect gaps - var gap error - if dp.StartTimestamp() > aggr.Timestamp() { - gap = ErrGap{From: aggr.Timestamp(), To: dp.StartTimestamp()} - } - - res := aggr.Add(dp) - if err := a.Map.Store(id, res); err != nil { - return err - } - return gap -} - type ErrOlderStart struct { Start pcommon.Timestamp Sample pcommon.Timestamp @@ -76,14 +30,6 @@ func (e ErrOutOfOrder) Error() string { return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last) } -type ErrGap struct { - From, To pcommon.Timestamp -} - -func (e ErrGap) Error() string { - return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To) -} - type Type interface { pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta_test.go b/processor/deltatocumulativeprocessor/internal/delta/delta_test.go deleted file mode 100644 index 961428ed3c4a..000000000000 --- a/processor/deltatocumulativeprocessor/internal/delta/delta_test.go +++ /dev/null @@ -1,229 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package delta_test - -import ( - "fmt" - "math/rand" - "strconv" - "sync" - "testing" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pcommon" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" -) - -var result any - -func aggr[P point[P]]() streams.Aggregator[P] { - return streams.IntoAggregator(delta.New[P]()) -} - -func BenchmarkAccumulator(b *testing.B) { - acc := aggr[data.Number]() - sum := random.Sum() - - bench := func(b *testing.B, nstreams int) { - nsamples := b.N / nstreams - - ids := make([]streams.Ident, nstreams) - dps := make([]data.Number, nstreams) - for i := 0; i < nstreams; i++ { - ids[i], dps[i] = sum.Stream() - } - - b.ResetTimer() - - var wg sync.WaitGroup - for i := 0; i < nstreams; i++ { - wg.Add(1) - go func(id streams.Ident, num data.Number) { - for n := 0; n < nsamples; n++ { - num.SetTimestamp(num.Timestamp() + 1) - val, err := acc.Aggregate(id, num) - if err != nil { - panic(err) - } - result = val - } - wg.Done() - }(ids[i], dps[i]) - } - - wg.Wait() - } - - nstreams := []int{1, 2, 10, 100, 1000} - for _, n := range nstreams { - b.Run(strconv.Itoa(n), func(b *testing.B) { - bench(b, n) - }) - } -} - -// verify the distinction between streams and the accumulated value -func TestAddition(t *testing.T) { - acc := aggr[data.Number]() - sum := random.Sum() - - type Idx int - type Stream struct { - idx Idx - id streams.Ident - dp data.Number - } - - streams := make([]Stream, 10) - for i := range streams { - id, dp := sum.Stream() - streams[i] = Stream{ - idx: Idx(i), - id: id, - dp: dp, - } - } - - want := make(map[Idx]int64) - for i := 0; i < 100; i++ { - stream := streams[rand.Intn(10)] - dp := stream.dp.Clone() - dp.SetTimestamp(dp.Timestamp() + pcommon.Timestamp(i)) - - val := int64(rand.Intn(255)) - dp.SetIntValue(val) - want[stream.idx] += val - - got, err := acc.Aggregate(stream.id, dp) - require.NoError(t, err) - - require.Equal(t, want[stream.idx], got.IntValue()) - } -} - -// verify that start + last times are updated -func TestTimes(t *testing.T) { - t.Run("sum", testTimes(random.Sum())) - t.Run("histogram", testTimes(random.Histogram())) - t.Run("exponential", testTimes(random.Exponential())) -} - -func testTimes[P point[P]](metric random.Metric[P]) func(t *testing.T) { - return func(t *testing.T) { - acc := aggr[P]() - id, base := metric.Stream() - point := func(start, last pcommon.Timestamp) P { - dp := base.Clone() - dp.SetStartTimestamp(start) - dp.SetTimestamp(last) - return dp - } - - // first sample: its the first ever, so take it as-is - { - dp := point(1000, 1000) - res, err := acc.Aggregate(id, dp) - - require.NoError(t, err) - require.Equal(t, time(1000), res.StartTimestamp()) - require.Equal(t, time(1000), res.Timestamp()) - } - - // second sample: its subsequent, so keep original startTime, but update lastSeen - { - dp := point(1000, 1100) - res, err := acc.Aggregate(id, dp) - - require.NoError(t, err) - require.Equal(t, time(1000), res.StartTimestamp()) - require.Equal(t, time(1100), res.Timestamp()) - } - - // third sample: its subsequent, but has a more recent startTime, which is - // PERMITTED by the spec. - // still keep original startTime, but update lastSeen. - { - dp := point(1100, 1200) - res, err := acc.Aggregate(id, dp) - - require.NoError(t, err) - require.Equal(t, time(1000), res.StartTimestamp()) - require.Equal(t, time(1200), res.Timestamp()) - } - } -} - -type point[Self any] interface { - random.Point[Self] - - SetTimestamp(pcommon.Timestamp) - SetStartTimestamp(pcommon.Timestamp) -} - -func TestErrs(t *testing.T) { - type Point struct { - Start int - Time int - Value int - } - type Case struct { - Good Point - Bad Point - Err error - } - - cases := []Case{ - { - Good: Point{Start: 1234, Time: 1337, Value: 42}, - Bad: Point{Start: 1000, Time: 2000, Value: 24}, - Err: delta.ErrOlderStart{Start: time(1234), Sample: time(1000)}, - }, - { - Good: Point{Start: 1234, Time: 1337, Value: 42}, - Bad: Point{Start: 1234, Time: 1336, Value: 24}, - Err: delta.ErrOutOfOrder{Last: time(1337), Sample: time(1336)}, - }, - } - - for _, c := range cases { - c := c - t.Run(fmt.Sprintf("%T", c.Err), func(t *testing.T) { - acc := aggr[data.Number]() - id, data := random.Sum().Stream() - - good := data.Clone() - good.SetStartTimestamp(pcommon.Timestamp(c.Good.Start)) - good.SetTimestamp(pcommon.Timestamp(c.Good.Time)) - good.SetIntValue(int64(c.Good.Value)) - - r1, err := acc.Aggregate(id, good) - require.NoError(t, err) - - require.Equal(t, good.StartTimestamp(), r1.StartTimestamp()) - require.Equal(t, good.Timestamp(), r1.Timestamp()) - require.Equal(t, good.IntValue(), r1.IntValue()) - - bad := data.Clone() - bad.SetStartTimestamp(pcommon.Timestamp(c.Bad.Start)) - bad.SetTimestamp(pcommon.Timestamp(c.Bad.Time)) - bad.SetIntValue(int64(c.Bad.Value)) - - r2, err := acc.Aggregate(id, bad) - require.ErrorIs(t, err, c.Err) - - // sample must be dropped => no change - require.Equal(t, r1.StartTimestamp(), r2.StartTimestamp()) - require.Equal(t, r1.Timestamp(), r2.Timestamp()) - require.Equal(t, r1.IntValue(), r2.IntValue()) - }) - } -} - -func time(ts int) pcommon.Timestamp { - return pcommon.Timestamp(ts) -} diff --git a/processor/deltatocumulativeprocessor/internal/lineartelemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/lineartelemetry/metrics.go deleted file mode 100644 index 7576883075c5..000000000000 --- a/processor/deltatocumulativeprocessor/internal/lineartelemetry/metrics.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" - -import ( - "context" - "errors" - "reflect" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" -) - -func New(set component.TelemetrySettings) (Metrics, error) { - zero := func() int { return -1 } - m := Metrics{ - tracked: &zero, - } - - trackedCb := metadata.WithDeltatocumulativeStreamsTrackedLinearCallback(func() int64 { - return int64((*m.tracked)()) - }) - - telb, err := metadata.NewTelemetryBuilder(set, trackedCb) - if err != nil { - return Metrics{}, err - } - m.TelemetryBuilder = *telb - - return m, nil -} - -type Metrics struct { - metadata.TelemetryBuilder - - tracked *func() int -} - -func (m Metrics) Datapoints() Counter { - return Counter{Int64Counter: m.DeltatocumulativeDatapointsLinear} -} - -func (m *Metrics) WithTracked(streams func() int) { - *m.tracked = streams -} - -func Error(msg string) attribute.KeyValue { - return attribute.String("error", msg) -} - -func Cause(err error) attribute.KeyValue { - for { - uw := errors.Unwrap(err) - if uw == nil { - break - } - err = uw - } - - return Error(reflect.TypeOf(err).String()) -} - -type Counter struct{ metric.Int64Counter } - -func (c Counter) Inc(ctx context.Context, attrs ...attribute.KeyValue) { - c.Add(ctx, 1, metric.WithAttributes(attrs...)) -} diff --git a/processor/deltatocumulativeprocessor/internal/maybe/ptr.go b/processor/deltatocumulativeprocessor/internal/maybe/ptr.go deleted file mode 100644 index 8f40b8d277b3..000000000000 --- a/processor/deltatocumulativeprocessor/internal/maybe/ptr.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -// maybe provides utilities for representing data may or may not exist at -// runtime in a safe way. -// -// A typical approach to this are pointers, but they suffer from two issues: -// - Unsafety: permitting nil pointers must require careful checking on each use, -// which is easily forgotten -// - Blindness: nil itself does cannot differentiate between "set to nil" and -// "not set all", leading to unexepcted edge cases -// -// The [Ptr] type of this package provides a safe alternative with a clear -// distinction between "not set" and "set to nil". -package maybe // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" - -// Ptr references some value of type T that is not guaranteed to exist. -// Callers must use [Ptr.Try] to access the underlying value, checking the -// ok return value too. -// This provides a clear distinction between "not set" and "set to nil". -// -// Use [Some] and [None] to create Ptrs. -type Ptr[T any] struct { - to *T - ok bool -} - -// None returns a Ptr that represents "not-set". -// This is equal to a zero-value Ptr. -func None[T any]() Ptr[T] { - return Ptr[T]{to: nil, ok: false} -} - -// Some returns a pointer to the passed T. -// -// The ptr argument may be nil, in which case this represents "explicitly set to -// nil". -func Some[T any](ptr *T) Ptr[T] { - return Ptr[T]{to: ptr, ok: true} -} - -// Try attempts to de-reference the Ptr, giving one of three results: -// -// - nil, false: not-set -// - nil, true: explicitly set to nil -// - non-nil, true: set to some value -// -// This provides extra safety over bare pointers, because callers are forced by -// the compiler to either check or explicitly ignore the ok value. -func (ptr Ptr[T]) Try() (_ *T, ok bool) { - return ptr.to, ptr.ok -} diff --git a/processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go b/processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go deleted file mode 100644 index c32c34e7e505..000000000000 --- a/processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package maybe_test - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" -) - -func TestMaybe(t *testing.T) { - t.Run("zero-not-ok", func(t *testing.T) { - var ptr maybe.Ptr[int] - _, ok := ptr.Try() - require.False(t, ok) - }) - t.Run("none-not-ok", func(t *testing.T) { - ptr := maybe.None[int]() - _, ok := ptr.Try() - require.False(t, ok) - }) - t.Run("explicit-nil", func(t *testing.T) { - ptr := maybe.Some[int](nil) - v, ok := ptr.Try() - require.Nil(t, v) - require.True(t, ok) - }) - t.Run("value", func(t *testing.T) { - num := 42 - ptr := maybe.Some(&num) - v, ok := ptr.Try() - require.True(t, ok) - require.Equal(t, num, *v) - }) -} - -func ExamplePtr() { - var unset maybe.Ptr[int] // = maybe.None() - if v, ok := unset.Try(); ok { - fmt.Println("unset:", v) - } else { - fmt.Println("unset: !ok") - } - - var xnil maybe.Ptr[int] = maybe.Some[int](nil) - if v, ok := xnil.Try(); ok { - fmt.Println("explicit nil:", v) - } - - num := 42 - var set maybe.Ptr[int] = maybe.Some(&num) - if v, ok := set.Try(); ok { - fmt.Println("set:", *v) - } - - // Output: - // unset: !ok - // explicit nil: - // set: 42 -} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/data.go b/processor/deltatocumulativeprocessor/internal/metrics/data.go index 08e1aa4b8ae8..cb9b2c0ef73d 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/data.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/data.go @@ -5,28 +5,10 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-con import ( "go.opentelemetry.io/collector/pdata/pmetric" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" ) -type Data[D data.Point[D]] interface { - At(i int) D - Len() int - Ident() Ident -} - -type Filterable[D data.Point[D]] interface { - Data[D] - Filter(func(D) bool) -} - type Sum Metric -func (s Sum) At(i int) data.Number { - dp := Metric(s).Sum().DataPoints().At(i) - return data.Number{NumberDataPoint: dp} -} - func (s Sum) Len() int { return Metric(s).Sum().DataPoints().Len() } @@ -35,23 +17,12 @@ func (s Sum) Ident() Ident { return (*Metric)(&s).Ident() } -func (s Sum) Filter(expr func(data.Number) bool) { - s.Sum().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool { - return !expr(data.Number{NumberDataPoint: dp}) - }) -} - func (s Sum) SetAggregationTemporality(at pmetric.AggregationTemporality) { s.Sum().SetAggregationTemporality(at) } type Histogram Metric -func (s Histogram) At(i int) data.Histogram { - dp := Metric(s).Histogram().DataPoints().At(i) - return data.Histogram{HistogramDataPoint: dp} -} - func (s Histogram) Len() int { return Metric(s).Histogram().DataPoints().Len() } @@ -60,23 +31,12 @@ func (s Histogram) Ident() Ident { return (*Metric)(&s).Ident() } -func (s Histogram) Filter(expr func(data.Histogram) bool) { - s.Histogram().DataPoints().RemoveIf(func(dp pmetric.HistogramDataPoint) bool { - return !expr(data.Histogram{HistogramDataPoint: dp}) - }) -} - func (s Histogram) SetAggregationTemporality(at pmetric.AggregationTemporality) { s.Histogram().SetAggregationTemporality(at) } type ExpHistogram Metric -func (s ExpHistogram) At(i int) data.ExpHistogram { - dp := Metric(s).ExponentialHistogram().DataPoints().At(i) - return data.ExpHistogram{DataPoint: dp} -} - func (s ExpHistogram) Len() int { return Metric(s).ExponentialHistogram().DataPoints().Len() } @@ -85,23 +45,12 @@ func (s ExpHistogram) Ident() Ident { return (*Metric)(&s).Ident() } -func (s ExpHistogram) Filter(expr func(data.ExpHistogram) bool) { - s.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool { - return !expr(data.ExpHistogram{DataPoint: dp}) - }) -} - func (s ExpHistogram) SetAggregationTemporality(at pmetric.AggregationTemporality) { s.ExponentialHistogram().SetAggregationTemporality(at) } type Gauge Metric -func (s Gauge) At(i int) data.Number { - dp := Metric(s).Gauge().DataPoints().At(i) - return data.Number{NumberDataPoint: dp} -} - func (s Gauge) Len() int { return Metric(s).Gauge().DataPoints().Len() } @@ -110,20 +59,10 @@ func (s Gauge) Ident() Ident { return (*Metric)(&s).Ident() } -func (s Gauge) Filter(expr func(data.Number) bool) { - s.Gauge().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool { - return !expr(data.Number{NumberDataPoint: dp}) - }) -} func (s Gauge) SetAggregationTemporality(pmetric.AggregationTemporality) {} type Summary Metric -func (s Summary) At(i int) data.Summary { - dp := Metric(s).Summary().DataPoints().At(i) - return data.Summary{SummaryDataPoint: dp} -} - func (s Summary) Len() int { return Metric(s).Summary().DataPoints().Len() } @@ -132,9 +71,4 @@ func (s Summary) Ident() Ident { return (*Metric)(&s).Ident() } -func (s Summary) Filter(expr func(data.Summary) bool) { - s.Summary().DataPoints().RemoveIf(func(dp pmetric.SummaryDataPoint) bool { - return !expr(data.Summary{SummaryDataPoint: dp}) - }) -} func (s Summary) SetAggregationTemporality(pmetric.AggregationTemporality) {} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/iter.go b/processor/deltatocumulativeprocessor/internal/metrics/iter.go index 9902d22a2eec..29d94051bfa2 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/iter.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/iter.go @@ -5,26 +5,8 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-con import ( "go.opentelemetry.io/collector/pdata/pmetric" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice" ) -func All(md pmetric.Metrics) func(func(Metric) bool) { - return func(yield func(Metric) bool) { - var ok bool - pslice.All(md.ResourceMetrics())(func(rm pmetric.ResourceMetrics) bool { - pslice.All(rm.ScopeMetrics())(func(sm pmetric.ScopeMetrics) bool { - pslice.All(sm.Metrics())(func(m pmetric.Metric) bool { - ok = yield(From(rm.Resource(), sm.Scope(), m)) - return ok - }) - return ok - }) - return ok - }) - } -} - func Filter(md pmetric.Metrics, keep func(Metric) bool) { md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { diff --git a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go index b19b03f1a1c7..d65e3b8341a0 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go @@ -75,7 +75,6 @@ var ( type Any interface { Len() int Ident() identity.Metric - SetAggregationTemporality(pmetric.AggregationTemporality) } diff --git a/processor/deltatocumulativeprocessor/internal/streams/data.go b/processor/deltatocumulativeprocessor/internal/streams/data.go deleted file mode 100644 index 201dae8d884e..000000000000 --- a/processor/deltatocumulativeprocessor/internal/streams/data.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - -import ( - "errors" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice" -) - -func Datapoints[P data.Point[P], List metrics.Data[P]](dps List) func(func(identity.Stream, P) bool) { - return func(yield func(identity.Stream, P) bool) { - mid := dps.Ident() - pslice.All(dps)(func(dp P) bool { - id := identity.OfStream(mid, dp) - return yield(id, dp) - }) - } -} - -type filterable[D data.Point[D]] interface { - metrics.Data[D] - Filter(func(D) bool) -} - -// Apply does dps[i] = fn(dps[i]) for each item in dps. -// If fn returns [streams.Drop], the datapoint is removed from dps instead. -// If fn returns another error, the datapoint is also removed and the error returned eventually -func Apply[P data.Point[P], List filterable[P]](dps List, fn func(Ident, P) (P, error)) error { - var errs error - - mid := dps.Ident() - dps.Filter(func(dp P) bool { - id := identity.OfStream(mid, dp) - next, err := fn(id, dp) - if err != nil { - if !errors.Is(err, Drop) { - err = Error(id, err) - errs = errors.Join(errs, err) - } - return false - } - - next.CopyTo(dp) - return true - }) - - return errs -} - -// Drop signals the current item (stream or datapoint) is to be dropped -var Drop = errors.New("stream dropped") //nolint:revive // Drop is a good name for a signal, see fs.SkipAll diff --git a/processor/deltatocumulativeprocessor/internal/streams/data_test.go b/processor/deltatocumulativeprocessor/internal/streams/data_test.go deleted file mode 100644 index 2c42abb4b334..000000000000 --- a/processor/deltatocumulativeprocessor/internal/streams/data_test.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package streams_test - -import ( - "math/rand" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" -) - -var ( - rdp data.Number - rid streams.Ident -) - -func BenchmarkSamples(b *testing.B) { - b.Run("iterfn", func(b *testing.B) { - dps := generate(b.N) - b.ResetTimer() - - streams.Datapoints(dps)(func(id streams.Ident, dp data.Number) bool { - rdp = dp - rid = id - return true - }) - }) - - b.Run("iface", func(b *testing.B) { - dps := generate(b.N) - mid := dps.id.Metric() - b.ResetTimer() - - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - rid = identity.OfStream(mid, dp) - rdp = dp - } - }) - - b.Run("loop", func(b *testing.B) { - dps := generate(b.N) - mid := dps.id.Metric() - b.ResetTimer() - - for i := range dps.dps { - dp := dps.dps[i] - rid = identity.OfStream(mid, dp) - rdp = dp - } - }) -} - -func TestAggregate(t *testing.T) { - const total = 1000 - dps := generate(total) - - // inv aggregator inverts each sample - inv := aggr(func(_ streams.Ident, n data.Number) (data.Number, error) { - dp := n.Clone() - dp.SetIntValue(-dp.IntValue()) - return dp, nil - }) - - err := streams.Apply(dps, inv.Aggregate) - require.NoError(t, err) - - // check that all samples are inverted - for i := 0; i < total; i++ { - require.Equal(t, int64(-i), dps.dps[i].IntValue()) - } -} - -func TestDrop(t *testing.T) { - const total = 1000 - dps := generate(total) - - var want []data.Number - maybe := aggr(func(_ streams.Ident, dp data.Number) (data.Number, error) { - if rand.Intn(2) == 1 { - want = append(want, dp) - return dp, nil - } - return dp, streams.Drop - }) - - err := streams.Apply(dps, maybe.Aggregate) - require.NoError(t, err) - - require.Equal(t, want, dps.dps) -} - -func generate(n int) *Data { - id, ndp := random.Sum().Stream() - dps := Data{id: id, dps: make([]data.Number, n)} - for i := range dps.dps { - dp := ndp.Clone() - dp.SetIntValue(int64(i)) - dps.dps[i] = dp - } - return &dps -} - -type Data struct { - id streams.Ident - dps []data.Number -} - -func (l Data) At(i int) data.Number { - return l.dps[i] -} - -func (l Data) Len() int { - return len(l.dps) -} - -func (l Data) Ident() metrics.Ident { - return l.id.Metric() -} - -func (l *Data) Filter(expr func(data.Number) bool) { - var next []data.Number - for _, dp := range l.dps { - if expr(dp) { - next = append(next, dp) - } - } - l.dps = next -} - -type aggr func(streams.Ident, data.Number) (data.Number, error) - -func (a aggr) Aggregate(id streams.Ident, dp data.Number) (data.Number, error) { - return a(id, dp) -} diff --git a/processor/deltatocumulativeprocessor/internal/streams/errors.go b/processor/deltatocumulativeprocessor/internal/streams/errors.go deleted file mode 100644 index c0638e091502..000000000000 --- a/processor/deltatocumulativeprocessor/internal/streams/errors.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - -import ( - "fmt" -) - -func Error(id Ident, err error) error { - return StreamErr{Ident: id, Err: err} -} - -type StreamErr struct { - Ident Ident - Err error -} - -func (e StreamErr) Error() string { - return fmt.Sprintf("%s: %s", e.Ident, e.Err) -} - -func (e StreamErr) Unwrap() error { - return e.Err -} diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit.go b/processor/deltatocumulativeprocessor/internal/streams/limit.go deleted file mode 100644 index dd1d927687c9..000000000000 --- a/processor/deltatocumulativeprocessor/internal/streams/limit.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - -import ( - "errors" - "fmt" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" -) - -func Limit[T any](m Map[T], max int) LimitMap[T] { - return LimitMap[T]{ - Map: m, Max: max, - Evictor: EvictorFunc(func() (identity.Stream, bool) { - return identity.Stream{}, false - }), - } -} - -type LimitMap[T any] struct { - Max int - - Evictor streams.Evictor - streams.Map[T] -} - -func (m LimitMap[T]) Store(id identity.Stream, v T) error { - _, exist := m.Map.Load(id) - - var errEv error - // if not already tracked and no space: try to evict - if !exist && m.Map.Len() >= m.Max { - errl := ErrLimit(m.Max) - gone, ok := m.Evictor.Evict() - if !ok { - // if no eviction possible, fail as there is no space - return errl - } - errEv = ErrEvicted{ErrLimit: errl, Ident: gone} - } - - // there was space, or we made space: store it - if err := m.Map.Store(id, v); err != nil { - return err - } - - // we may have evicted something, let the caller know - return errEv -} - -type ErrLimit int - -func (e ErrLimit) Error() string { - return fmt.Sprintf("stream limit of %d reached", e) -} - -func AtLimit(err error) bool { - var errLimit ErrLimit - return errors.As(err, &errLimit) -} - -type ErrEvicted struct { - ErrLimit - Ident Ident -} - -func (e ErrEvicted) Error() string { - return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.Ident) -} - -type EvictorFunc func() (identity.Stream, bool) - -func (ev EvictorFunc) Evict() (identity.Stream, bool) { - return ev() -} diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go deleted file mode 100644 index 380f657eb227..000000000000 --- a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package streams_test - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" -) - -func TestLimit(t *testing.T) { - sum := random.Sum() - - items := make(exp.HashMap[data.Number]) - lim := streams.Limit(items, 10) - - ids := make([]identity.Stream, 10) - dps := make([]data.Number, 10) - - // write until limit must work - for i := 0; i < 10; i++ { - id, dp := sum.Stream() - ids[i] = id - dps[i] = dp - err := lim.Store(id, dp) - require.NoError(t, err) - } - - // one over limit must be rejected - { - id, dp := sum.Stream() - err := lim.Store(id, dp) - want := streams.ErrLimit(10) - require.ErrorAs(t, err, &want) - require.True(t, streams.AtLimit(err)) - } - - // write to existing must work - { - err := lim.Store(ids[3], dps[3]) - require.NoError(t, err) - } - - // after removing one, must be accepted again - { - lim.Delete(ids[0]) - - id, dp := sum.Stream() - err := lim.Store(id, dp) - require.NoError(t, err) - } -} - -func TestLimitEvict(t *testing.T) { - sum := random.Sum() - evictable := make(map[identity.Stream]struct{}) - - items := make(exp.HashMap[data.Number]) - lim := streams.Limit(items, 5) - - ids := make([]identity.Stream, 10) - lim.Evictor = streams.EvictorFunc(func() (identity.Stream, bool) { - for _, id := range ids { - if _, ok := evictable[id]; ok { - delete(evictable, id) - return id, true - } - } - return identity.Stream{}, false - }) - - dps := make([]data.Number, 10) - for i := 0; i < 10; i++ { - id, dp := sum.Stream() - ids[i] = id - dps[i] = dp - } - - // store up to limit must work - for i := 0; i < 5; i++ { - err := lim.Store(ids[i], dps[i]) - require.NoError(t, err) - } - - // store beyond limit must fail - for i := 5; i < 10; i++ { - err := lim.Store(ids[i], dps[i]) - require.Equal(t, streams.ErrLimit(5), err) - } - - // put two streams up for eviction - evictable[ids[2]] = struct{}{} - evictable[ids[3]] = struct{}{} - - // while evictable do so, fail again afterwards - for i := 5; i < 10; i++ { - err := lim.Store(ids[i], dps[i]) - if i < 7 { - require.Equal(t, streams.ErrEvicted{ErrLimit: streams.ErrLimit(5), Ident: ids[i-3]}, err) - } else { - require.Equal(t, streams.ErrLimit(5), err) - } - } -} diff --git a/processor/deltatocumulativeprocessor/internal/streams/streams.go b/processor/deltatocumulativeprocessor/internal/streams/streams.go deleted file mode 100644 index 1b34f806b272..000000000000 --- a/processor/deltatocumulativeprocessor/internal/streams/streams.go +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - -import ( - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" -) - -type Ident = identity.Stream - -type ( - Seq[T any] streams.Seq[T] - Map[T any] streams.Map[T] -) - -type Aggregator[D data.Point[D]] interface { - Aggregate(Ident, D) (D, error) -} - -func IntoAggregator[D data.Point[D]](m Map[D]) MapAggr[D] { - return MapAggr[D]{Map: m} -} - -type MapAggr[D data.Point[D]] struct { - Map[D] -} - -func (a MapAggr[D]) Aggregate(id Ident, dp D) (D, error) { - err := a.Map.Store(id, dp) - v, _ := a.Map.Load(id) - return v, err -} - -type Evictor = streams.Evictor diff --git a/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go b/processor/deltatocumulativeprocessor/internal/telemetry/attr.go similarity index 94% rename from processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go rename to processor/deltatocumulativeprocessor/internal/telemetry/attr.go index cdd68a75b76c..ccfac6553fcb 100644 --- a/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/attr.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" +package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" import "go.opentelemetry.io/otel/attribute" diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go deleted file mode 100644 index f159ba11dc83..000000000000 --- a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package telemetry_test - -import ( - "testing" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/pdata/pcommon" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" -) - -// TestFaults verifies certain non-fatal errors are actually caused and -// subsequently dropped. It does so by writing bad samples to the actual -// implementation instead of fabricating errors manually. -func TestFaults(t *testing.T) { - type Map = streams.Map[data.Number] - type Case struct { - Name string - Map Map - // data preparation, etc - Pre func(Map, identity.Stream, data.Number) error - // cause an error - Bad func(Map, identity.Stream, data.Number) error - // expected error that was caused - Err error - // expected return above error was converted into - Want error - } - - sum := random.Sum() - evid, evdp := sum.Stream() - - cases := []Case{ - { - Name: "older-start", - Pre: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetStartTimestamp(ts(20)) - dp.SetTimestamp(ts(30)) - return dps.Store(id, dp) - }, - Bad: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetStartTimestamp(ts(10)) - dp.SetTimestamp(ts(40)) - return dps.Store(id, dp) - }, - Err: delta.ErrOlderStart{Start: ts(20), Sample: ts(10)}, - Want: streams.Drop, - }, - { - Name: "out-of-order", - Pre: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetTimestamp(ts(20)) - return dps.Store(id, dp) - }, - Bad: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetTimestamp(ts(10)) - return dps.Store(id, dp) - }, - Err: delta.ErrOutOfOrder{Last: ts(20), Sample: ts(10)}, - Want: streams.Drop, - }, - { - Name: "gap", - Pre: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetStartTimestamp(ts(10)) - dp.SetTimestamp(ts(20)) - return dps.Store(id, dp) - }, - Bad: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetStartTimestamp(ts(30)) - dp.SetTimestamp(ts(40)) - return dps.Store(id, dp) - }, - Err: delta.ErrGap{From: ts(20), To: ts(30)}, - Want: nil, - }, - { - Name: "limit", - Map: streams.Limit(delta.New[data.Number](), 1), - Pre: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetTimestamp(ts(10)) - return dps.Store(id, dp) - }, - Bad: func(dps Map, _ identity.Stream, _ data.Number) error { - id, dp := sum.Stream() - dp.SetTimestamp(ts(20)) - return dps.Store(id, dp) - }, - Err: streams.ErrLimit(1), - Want: streams.Drop, // we can't ignore being at limit, we need to drop the entire stream for this request - }, - { - Name: "evict", - Map: func() Map { - ev := HeadEvictor[data.Number]{Map: delta.New[data.Number]()} - lim := streams.Limit(ev, 1) - lim.Evictor = ev - return lim - }(), - Pre: func(dps Map, _ identity.Stream, _ data.Number) error { - evdp.SetTimestamp(ts(10)) - return dps.Store(evid, evdp) - }, - Bad: func(dps Map, _ identity.Stream, _ data.Number) error { - id, dp := sum.Stream() - dp.SetTimestamp(ts(20)) - return dps.Store(id, dp) - }, - Err: streams.ErrEvicted{Ident: evid, ErrLimit: streams.ErrLimit(1)}, - Want: nil, - }, - } - - telb, err := metadata.NewTelemetryBuilder(componenttest.NewNopTelemetrySettings()) - require.NoError(t, err) - - for _, c := range cases { - t.Run(c.Name, func(t *testing.T) { - id, dp := sum.Stream() - tel := telemetry.New(telb) - - dps := c.Map - if dps == nil { - dps = delta.New[data.Number]() - } - var realErr error - dps = errGrab[data.Number]{Map: dps, err: &realErr} - dps = telemetry.ObserveNonFatal(dps, &tel.Metrics) - - if c.Pre != nil { - err := c.Pre(dps, id, dp.Clone()) - require.NoError(t, err) - } - - err := c.Bad(dps, id, dp.Clone()) - require.Equal(t, c.Err, realErr) - require.Equal(t, c.Want, err) - }) - } -} - -type ts = pcommon.Timestamp - -// HeadEvictor drops the first stream on Evict() -type HeadEvictor[T any] struct{ streams.Map[T] } - -func (e HeadEvictor[T]) Evict() (evicted identity.Stream, ok bool) { - e.Items()(func(id identity.Stream, _ T) bool { - e.Delete(id) - evicted = id - return false - }) - return evicted, true -} - -// errGrab stores any error that happens on Store() for later inspection -type errGrab[T any] struct { - streams.Map[T] - err *error -} - -func (e errGrab[T]) Store(id identity.Stream, dp T) error { - *e.err = e.Map.Store(id, dp) - return *e.err -} diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index 8062fc8388a8..ab6fde655005 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -6,156 +6,66 @@ package telemetry // import "github.com/open-telemetry/opentelemetry-collector-c import ( "context" "errors" - "time" + "reflect" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" ) -type Telemetry struct { - Metrics -} +func New(set component.TelemetrySettings) (Metrics, error) { + zero := func() int { return -1 } + m := Metrics{ + tracked: &zero, + } -func New(telb *metadata.TelemetryBuilder) Telemetry { - return Telemetry{Metrics: Metrics{ - streams: Streams{ - tracked: telb.DeltatocumulativeStreamsTracked, - limit: telb.DeltatocumulativeStreamsLimit, - evicted: telb.DeltatocumulativeStreamsEvicted, - stale: telb.DeltatocumulativeStreamsMaxStale, - }, - dps: Datapoints{ - total: telb.DeltatocumulativeDatapointsProcessed, - dropped: telb.DeltatocumulativeDatapointsDropped, - }, - gaps: telb.DeltatocumulativeGapsLength, - }} -} + trackedCb := metadata.WithDeltatocumulativeStreamsTrackedLinearCallback(func() int64 { + return int64((*m.tracked)()) + }) -type Streams struct { - tracked metric.Int64UpDownCounter - limit metric.Int64Gauge - evicted metric.Int64Counter - stale metric.Int64Gauge -} + telb, err := metadata.NewTelemetryBuilder(set, trackedCb) + if err != nil { + return Metrics{}, err + } + m.TelemetryBuilder = *telb -type Datapoints struct { - total metric.Int64Counter - dropped metric.Int64Counter + return m, nil } type Metrics struct { - streams Streams - dps Datapoints - - gaps metric.Int64Counter -} + metadata.TelemetryBuilder -func (tel Telemetry) WithLimit(max int64) { - tel.streams.limit.Record(context.Background(), max) + tracked *func() int } -func (tel Telemetry) WithStale(max time.Duration) { - tel.streams.stale.Record(context.Background(), int64(max.Seconds())) +func (m Metrics) Datapoints() Counter { + return Counter{Int64Counter: m.DeltatocumulativeDatapointsLinear} } -func ObserveItems[T any](items streams.Map[T], metrics *Metrics) Items[T] { - return Items[T]{ - Map: items, - Metrics: metrics, - } -} - -func ObserveNonFatal[T any](items streams.Map[T], metrics *Metrics) Faults[T] { - return Faults[T]{ - Map: items, - Metrics: metrics, - } +func (m *Metrics) WithTracked(streams func() int) { + *m.tracked = streams } -type Items[T any] struct { - streams.Map[T] - *Metrics +func Error(msg string) attribute.KeyValue { + return attribute.String("error", msg) } -func (i Items[T]) Store(id streams.Ident, v T) error { - inc(i.dps.total) - - _, old := i.Map.Load(id) - err := i.Map.Store(id, v) - if err == nil && !old { - inc(i.streams.tracked) +func Cause(err error) attribute.KeyValue { + for { + uw := errors.Unwrap(err) + if uw == nil { + break + } + err = uw } - return err -} - -func (i Items[T]) Delete(id streams.Ident) { - dec(i.streams.tracked) - i.Map.Delete(id) + return Error(reflect.TypeOf(err).String()) } -type Faults[T any] struct { - streams.Map[T] - *Metrics -} - -func (f Faults[T]) Store(id streams.Ident, v T) error { - var ( - olderStart delta.ErrOlderStart - outOfOrder delta.ErrOutOfOrder - gap delta.ErrGap - limit streams.ErrLimit - evict streams.ErrEvicted - ) - - err := f.Map.Store(id, v) - switch { - default: - return err - case errors.As(err, &olderStart): - inc(f.dps.dropped, reason("older-start")) - return streams.Drop - case errors.As(err, &outOfOrder): - inc(f.dps.dropped, reason("out-of-order")) - return streams.Drop - case errors.As(err, &limit): - inc(f.dps.dropped, reason("stream-limit")) - // no space to store stream, drop it instead of failing silently - return streams.Drop - case errors.As(err, &evict): - inc(f.streams.evicted) - case errors.As(err, &gap): - from := gap.From.AsTime() - to := gap.To.AsTime() - lost := to.Sub(from).Seconds() - f.gaps.Add(context.TODO(), int64(lost)) - } - - return nil -} - -var ( - _ streams.Map[any] = (*Items[any])(nil) - _ streams.Map[any] = (*Faults[any])(nil) -) - -type addable[Opts any] interface { - Add(context.Context, int64, ...Opts) -} - -func inc[A addable[O], O any](a A, opts ...O) { - a.Add(context.Background(), 1, opts...) -} - -func dec[A addable[O], O any](a A, opts ...O) { - a.Add(context.Background(), -1, opts...) -} +type Counter struct{ metric.Int64Counter } -func reason(reason string) metric.AddOption { - return metric.WithAttributes(attribute.String("reason", reason)) +func (c Counter) Inc(ctx context.Context, attrs ...attribute.KeyValue) { + c.Add(ctx, 1, metric.WithAttributes(attrs...)) } diff --git a/processor/deltatocumulativeprocessor/internal/testdata/random/random.go b/processor/deltatocumulativeprocessor/internal/testdata/random/random.go deleted file mode 100644 index ca0642cf8795..000000000000 --- a/processor/deltatocumulativeprocessor/internal/testdata/random/random.go +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package random - -import ( - "math" - "math/rand" - "strconv" - "time" - - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" -) - -type Point[Self any] interface { - data.Typed[Self] - - SetTimestamp(pcommon.Timestamp) -} - -type Metric[P Point[P]] struct { - metrics.Metric -} - -func New[P Point[P]]() Metric[P] { - metric := pmetric.NewMetric() - metric.SetName(randStr()) - metric.SetDescription(randStr()) - metric.SetUnit(randStr()) - return Metric[P]{Metric: metrics.From(Resource(), Scope(), metric)} -} - -func Sum() Metric[data.Number] { - metric := New[data.Number]() - metric.SetEmptySum() - return metric -} - -func Histogram() Metric[data.Histogram] { - metric := New[data.Histogram]() - metric.SetEmptyHistogram() - return metric -} - -func Exponential() Metric[data.ExpHistogram] { - metric := New[data.ExpHistogram]() - metric.SetEmptyExponentialHistogram() - return metric -} - -func (m Metric[P]) Stream() (streams.Ident, P) { - var dp P = data.Zero[P]() - - dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) - - for i := 0; i < 10; i++ { - dp.Attributes().PutStr(randStr(), randStr()) - } - id := identity.OfStream(m.Ident(), dp) - - return id, dp -} - -func Resource() pcommon.Resource { - return ResourceN(10) -} - -func ResourceN(n int) pcommon.Resource { - res := pcommon.NewResource() - Attributes(n).MoveTo(res.Attributes()) - return res -} - -func Scope() pcommon.InstrumentationScope { - return ScopeN(3) -} - -func ScopeN(n int) pcommon.InstrumentationScope { - scope := pcommon.NewInstrumentationScope() - scope.SetName(randStr()) - scope.SetVersion(randStr()) - Attributes(n).MoveTo(scope.Attributes()) - return scope -} - -func Attributes(n int) pcommon.Map { - m := pcommon.NewMap() - for i := 0; i < n; i++ { - m.PutStr(randStr(), randStr()) - } - return m -} - -func randStr() string { - return strconv.FormatInt(randInt(), 16) -} - -func randInt() int64 { - return int64(rand.Intn(math.MaxInt16)) -} - -func randFloat() float64 { - return float64(randInt()) / float64(randInt()) -} diff --git a/processor/deltatocumulativeprocessor/internal/testar/crlf/crlf.go b/processor/deltatocumulativeprocessor/internal/testing/testar/crlf/crlf.go similarity index 96% rename from processor/deltatocumulativeprocessor/internal/testar/crlf/crlf.go rename to processor/deltatocumulativeprocessor/internal/testing/testar/crlf/crlf.go index eed8ec7a1c33..bc00f79f978f 100644 --- a/processor/deltatocumulativeprocessor/internal/testar/crlf/crlf.go +++ b/processor/deltatocumulativeprocessor/internal/testing/testar/crlf/crlf.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package crlf // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testar/crlf" +package crlf // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/testar/crlf" import ( "bytes" diff --git a/processor/deltatocumulativeprocessor/internal/testing/testar/decode.go b/processor/deltatocumulativeprocessor/internal/testing/testar/decode.go index 2d01f34174c2..6b7cdd1a7f45 100644 --- a/processor/deltatocumulativeprocessor/internal/testing/testar/decode.go +++ b/processor/deltatocumulativeprocessor/internal/testing/testar/decode.go @@ -27,7 +27,7 @@ import ( "golang.org/x/tools/txtar" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testar/crlf" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/testar/crlf" ) // Read archive data into the fields of struct *T diff --git a/processor/deltatocumulativeprocessor/internal/testing/testar/read_test.go b/processor/deltatocumulativeprocessor/internal/testing/testar/read_test.go index ffbe38215efb..ac65a7497b86 100644 --- a/processor/deltatocumulativeprocessor/internal/testing/testar/read_test.go +++ b/processor/deltatocumulativeprocessor/internal/testing/testar/read_test.go @@ -9,7 +9,7 @@ import ( "strings" "testing" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testar/crlf" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/testar/crlf" ) func ExampleRead() { diff --git a/processor/deltatocumulativeprocessor/linear.go b/processor/deltatocumulativeprocessor/linear.go deleted file mode 100644 index 2b725b7dc78d..000000000000 --- a/processor/deltatocumulativeprocessor/linear.go +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" - -import ( - "context" - "sync" - "time" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/processor" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" - telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" -) - -var _ processor.Metrics = (*Linear)(nil) - -type Linear struct { - next consumer.Metrics - cfg Config - - last state - mtx sync.Mutex - - ctx context.Context - cancel context.CancelFunc - - stale staleness.Tracker - tel telemetry.Metrics -} - -func newLinear(cfg *Config, tel telemetry.Metrics, next consumer.Metrics) *Linear { - ctx, cancel := context.WithCancel(context.Background()) - - proc := Linear{ - next: next, - cfg: *cfg, - last: state{ - nums: make(map[identity.Stream]pmetric.NumberDataPoint), - hist: make(map[identity.Stream]pmetric.HistogramDataPoint), - expo: make(map[identity.Stream]pmetric.ExponentialHistogramDataPoint), - }, - ctx: ctx, - cancel: cancel, - - stale: staleness.NewTracker(), - tel: tel, - } - - tel.WithTracked(proc.last.Len) - cfg.Metrics(tel) - - return &proc -} - -func (p *Linear) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - p.mtx.Lock() - defer p.mtx.Unlock() - - now := time.Now() - - const ( - keep = true - drop = false - ) - - metrics.Filter(md, func(m metrics.Metric) bool { - if m.AggregationTemporality() != pmetric.AggregationTemporalityDelta { - return keep - } - - // aggregate the datapoints. - // using filter here, as the pmetric.*DataPoint are reference types so - // we can modify them using their "value". - m.Filter(func(id identity.Stream, dp any) bool { - // count the processed datatype. - // uses whatever value of attrs has at return-time - var attrs telemetry.Attributes - defer func() { p.tel.Datapoints().Inc(ctx, attrs...) }() - - // if stream new and state capacity reached, reject - exist := p.last.Has(id) - if !exist && p.last.Len() >= p.cfg.MaxStreams { - attrs.Set(telemetry.Error("limit")) - return drop - } - - // stream is ok and active, update stale tracker - p.stale.Refresh(now, id) - - // this is the first sample of the stream. there is nothing to - // aggregate with, so clone this value into the state and done - if !exist { - p.last.BeginWith(id, dp) - return keep - } - - // aggregate with state from previous requests. - // delta.AccumulateInto(state, dp) stores result in `state`. - // this is then copied into `dp` (the value passed onto the pipeline) - var err error - switch dp := dp.(type) { - case pmetric.NumberDataPoint: - state := p.last.nums[id] - err = delta.AccumulateInto(state, dp) - state.CopyTo(dp) - case pmetric.HistogramDataPoint: - state := p.last.hist[id] - err = delta.AccumulateInto(state, dp) - state.CopyTo(dp) - case pmetric.ExponentialHistogramDataPoint: - state := p.last.expo[id] - err = delta.AccumulateInto(state, dp) - state.CopyTo(dp) - } - if err != nil { - attrs.Set(telemetry.Cause(err)) - return drop - } - - return keep - }) - - // all remaining datapoints of this metric are now cumulative - m.Typed().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - // if no datapoints remain, drop empty metric - return m.Typed().Len() > 0 - }) - - // no need to continue pipeline if we dropped all metrics - if md.MetricCount() == 0 { - return nil - } - return p.next.ConsumeMetrics(ctx, md) -} - -func (p *Linear) Start(_ context.Context, _ component.Host) error { - if p.cfg.MaxStale != 0 { - // delete stale streams once per minute - go func() { - tick := time.NewTicker(time.Minute) - defer tick.Stop() - for { - select { - case <-p.ctx.Done(): - return - case <-tick.C: - p.mtx.Lock() - stale := p.stale.Collect(p.cfg.MaxStale) - for _, id := range stale { - p.last.Delete(id) - } - p.mtx.Unlock() - } - } - }() - } - - return nil -} - -func (p *Linear) Shutdown(_ context.Context) error { - p.cancel() - return nil -} - -func (p *Linear) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: true} -} - -// state keeps a cumulative value, aggregated over time, per stream -type state struct { - nums map[identity.Stream]pmetric.NumberDataPoint - hist map[identity.Stream]pmetric.HistogramDataPoint - expo map[identity.Stream]pmetric.ExponentialHistogramDataPoint -} - -func (m state) Len() int { - return len(m.nums) + len(m.hist) + len(m.expo) -} - -func (m state) Has(id identity.Stream) bool { - _, nok := m.nums[id] - _, hok := m.hist[id] - _, eok := m.expo[id] - return nok || hok || eok -} - -func (m state) Delete(id identity.Stream) { - delete(m.nums, id) - delete(m.hist, id) - delete(m.expo, id) -} - -func (m state) BeginWith(id identity.Stream, dp any) { - switch dp := dp.(type) { - case pmetric.NumberDataPoint: - m.nums[id] = pmetric.NewNumberDataPoint() - dp.CopyTo(m.nums[id]) - case pmetric.HistogramDataPoint: - m.hist[id] = pmetric.NewHistogramDataPoint() - dp.CopyTo(m.hist[id]) - case pmetric.ExponentialHistogramDataPoint: - m.expo[id] = pmetric.NewExponentialHistogramDataPoint() - dp.CopyTo(m.expo[id]) - } -} diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index 219f657df964..149431b8970c 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -5,7 +5,6 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele import ( "context" - "errors" "sync" "time" @@ -13,15 +12,11 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor" - "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" ) @@ -29,92 +24,145 @@ var _ processor.Metrics = (*Processor)(nil) type Processor struct { next consumer.Metrics + cfg Config + + last state + mtx sync.Mutex - log *zap.Logger ctx context.Context cancel context.CancelFunc - sums Pipeline[data.Number] - expo Pipeline[data.ExpHistogram] - hist Pipeline[data.Histogram] - - mtx sync.Mutex + stale staleness.Tracker + tel telemetry.Metrics } -func newProcessor(cfg *Config, log *zap.Logger, telb *metadata.TelemetryBuilder, next consumer.Metrics) *Processor { +func newProcessor(cfg *Config, tel telemetry.Metrics, next consumer.Metrics) *Processor { ctx, cancel := context.WithCancel(context.Background()) - tel := telemetry.New(telb) proc := Processor{ - log: log, + next: next, + cfg: *cfg, + last: state{ + nums: make(map[identity.Stream]pmetric.NumberDataPoint), + hist: make(map[identity.Stream]pmetric.HistogramDataPoint), + expo: make(map[identity.Stream]pmetric.ExponentialHistogramDataPoint), + }, ctx: ctx, cancel: cancel, - next: next, - sums: pipeline[data.Number](cfg, &tel), - expo: pipeline[data.ExpHistogram](cfg, &tel), - hist: pipeline[data.Histogram](cfg, &tel), + stale: staleness.NewTracker(), + tel: tel, } + tel.WithTracked(proc.last.Len) + cfg.Metrics(tel) + return &proc } -type Pipeline[D data.Point[D]] struct { - aggr streams.Aggregator[D] - stale maybe.Ptr[staleness.Staleness[D]] -} +func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + p.mtx.Lock() + defer p.mtx.Unlock() -func pipeline[D data.Point[D]](cfg *Config, tel *telemetry.Telemetry) Pipeline[D] { - var pipe Pipeline[D] + now := time.Now() - var dps streams.Map[D] - dps = delta.New[D]() - dps = telemetry.ObserveItems(dps, &tel.Metrics) + const ( + keep = true + drop = false + ) - if cfg.MaxStale > 0 { - tel.WithStale(cfg.MaxStale) - stale := maybe.Some(staleness.NewStaleness(cfg.MaxStale, dps)) - pipe.stale = stale - dps, _ = stale.Try() - } - if cfg.MaxStreams > 0 { - tel.WithLimit(int64(cfg.MaxStreams)) - lim := streams.Limit(dps, cfg.MaxStreams) - if stale, ok := pipe.stale.Try(); ok { - lim.Evictor = stale + metrics.Filter(md, func(m metrics.Metric) bool { + if m.AggregationTemporality() != pmetric.AggregationTemporalityDelta { + return keep } - dps = lim - } - dps = telemetry.ObserveNonFatal(dps, &tel.Metrics) + // aggregate the datapoints. + // using filter here, as the pmetric.*DataPoint are reference types so + // we can modify them using their "value". + m.Filter(func(id identity.Stream, dp any) bool { + // count the processed datatype. + // uses whatever value of attrs has at return-time + var attrs telemetry.Attributes + defer func() { p.tel.Datapoints().Inc(ctx, attrs...) }() + + // if stream new and state capacity reached, reject + exist := p.last.Has(id) + if !exist && p.last.Len() >= p.cfg.MaxStreams { + attrs.Set(telemetry.Error("limit")) + return drop + } - pipe.aggr = streams.IntoAggregator(dps) - return pipe -} + // stream is ok and active, update stale tracker + p.stale.Refresh(now, id) -func (p *Processor) Start(_ context.Context, _ component.Host) error { - sums, sok := p.sums.stale.Try() - expo, eok := p.expo.stale.Try() - hist, hok := p.hist.stale.Try() - if !(sok && eok && hok) { + // this is the first sample of the stream. there is nothing to + // aggregate with, so clone this value into the state and done + if !exist { + p.last.BeginWith(id, dp) + return keep + } + + // aggregate with state from previous requests. + // delta.AccumulateInto(state, dp) stores result in `state`. + // this is then copied into `dp` (the value passed onto the pipeline) + var err error + switch dp := dp.(type) { + case pmetric.NumberDataPoint: + state := p.last.nums[id] + err = delta.AccumulateInto(state, dp) + state.CopyTo(dp) + case pmetric.HistogramDataPoint: + state := p.last.hist[id] + err = delta.AccumulateInto(state, dp) + state.CopyTo(dp) + case pmetric.ExponentialHistogramDataPoint: + state := p.last.expo[id] + err = delta.AccumulateInto(state, dp) + state.CopyTo(dp) + } + if err != nil { + attrs.Set(telemetry.Cause(err)) + return drop + } + + return keep + }) + + // all remaining datapoints of this metric are now cumulative + m.Typed().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + // if no datapoints remain, drop empty metric + return m.Typed().Len() > 0 + }) + + // no need to continue pipeline if we dropped all metrics + if md.MetricCount() == 0 { return nil } + return p.next.ConsumeMetrics(ctx, md) +} - go func() { - tick := time.NewTicker(time.Minute) - for { - select { - case <-p.ctx.Done(): - return - case <-tick.C: - p.mtx.Lock() - sums.ExpireOldEntries() - expo.ExpireOldEntries() - hist.ExpireOldEntries() - p.mtx.Unlock() +func (p *Processor) Start(_ context.Context, _ component.Host) error { + if p.cfg.MaxStale != 0 { + // delete stale streams once per minute + go func() { + tick := time.NewTicker(time.Minute) + defer tick.Stop() + for { + select { + case <-p.ctx.Done(): + return + case <-tick.C: + p.mtx.Lock() + stale := p.stale.Collect(p.cfg.MaxStale) + for _, id := range stale { + p.last.Delete(id) + } + p.mtx.Unlock() + } } - } - }() + }() + } + return nil } @@ -127,56 +175,40 @@ func (p *Processor) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } -func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - if err := context.Cause(p.ctx); err != nil { - return err - } +// state keeps a cumulative value, aggregated over time, per stream +type state struct { + nums map[identity.Stream]pmetric.NumberDataPoint + hist map[identity.Stream]pmetric.HistogramDataPoint + expo map[identity.Stream]pmetric.ExponentialHistogramDataPoint +} - p.mtx.Lock() - defer p.mtx.Unlock() +func (m state) Len() int { + return len(m.nums) + len(m.hist) + len(m.expo) +} - var errs error - metrics.Filter(md, func(m metrics.Metric) bool { - var n int - //exhaustive:enforce - switch m.Type() { - case pmetric.MetricTypeGauge: - n = m.Gauge().DataPoints().Len() - case pmetric.MetricTypeSum: - sum := m.Sum() - if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta { - err := streams.Apply(metrics.Sum(m), p.sums.aggr.Aggregate) - errs = errors.Join(errs, err) - sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - } - n = sum.DataPoints().Len() - case pmetric.MetricTypeHistogram: - hist := m.Histogram() - if hist.AggregationTemporality() == pmetric.AggregationTemporalityDelta { - err := streams.Apply(metrics.Histogram(m), p.hist.aggr.Aggregate) - errs = errors.Join(errs, err) - hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - } - n = hist.DataPoints().Len() - case pmetric.MetricTypeExponentialHistogram: - expo := m.ExponentialHistogram() - if expo.AggregationTemporality() == pmetric.AggregationTemporalityDelta { - err := streams.Apply(metrics.ExpHistogram(m), p.expo.aggr.Aggregate) - errs = errors.Join(errs, err) - expo.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - } - n = expo.DataPoints().Len() - case pmetric.MetricTypeSummary: - n = m.Summary().DataPoints().Len() - } - return n > 0 - }) - if errs != nil { - return errs - } +func (m state) Has(id identity.Stream) bool { + _, nok := m.nums[id] + _, hok := m.hist[id] + _, eok := m.expo[id] + return nok || hok || eok +} - if md.MetricCount() == 0 { - return nil +func (m state) Delete(id identity.Stream) { + delete(m.nums, id) + delete(m.hist, id) + delete(m.expo, id) +} + +func (m state) BeginWith(id identity.Stream, dp any) { + switch dp := dp.(type) { + case pmetric.NumberDataPoint: + m.nums[id] = pmetric.NewNumberDataPoint() + dp.CopyTo(m.nums[id]) + case pmetric.HistogramDataPoint: + m.hist[id] = pmetric.NewHistogramDataPoint() + dp.CopyTo(m.hist[id]) + case pmetric.ExponentialHistogramDataPoint: + m.expo[id] = pmetric.NewExponentialHistogramDataPoint() + dp.CopyTo(m.expo[id]) } - return p.next.ConsumeMetrics(ctx, md) }