Skip to content

Commit

Permalink
processor/deltatocumulative: telemetry tests (#35742)
Browse files Browse the repository at this point in the history
#### Description

Tests internal telemetry (metadata.TelemetryBuilder) is recorded as
expected.

Introduces `internal/testing/sdktest` for this.
Introduces `-- telemetry --` section to testdata.

<!--Describe what testing was performed and which tests were added.-->
#### Testing

Existing tests were extended to have a `-- telemetry --` section that
specifies expected meter readings in `sdktest.Format`

<!--Describe the documentation added.-->
#### Documentation
not needed

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
sh0rez authored Nov 19, 2024
1 parent 2a9e103 commit 9466fb2
Show file tree
Hide file tree
Showing 17 changed files with 665 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/compare"
)

// T is the testing helper. Most notably it provides [T.Equal]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
)

func New(set component.TelemetrySettings) (Metrics, error) {
zero := func() int { return -1 }
m := Metrics{
tracked: func() int { return 0 },
tracked: &zero,
}

trackedCb := metadata.WithDeltatocumulativeStreamsTrackedLinearCallback(func() int64 {
return int64(m.tracked())
return int64((*m.tracked)())
})

telb, err := metadata.NewTelemetryBuilder(set, trackedCb)
Expand All @@ -36,15 +37,15 @@ func New(set component.TelemetrySettings) (Metrics, error) {
type Metrics struct {
metadata.TelemetryBuilder

tracked func() int
tracked *func() int
}

func (m Metrics) Datapoints() Counter {
return Counter{Int64Counter: m.DeltatocumulativeDatapointsLinear}
}

func (m *Metrics) WithTracked(streams func() int) {
m.tracked = streams
*m.tracked = streams
}

func Error(msg string) attribute.KeyValue {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package compare // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare"
package compare // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/compare"

import (
"reflect"
Expand All @@ -11,17 +11,28 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
)

var Opts = []cmp.Option{
var allow = []string{
"go.opentelemetry.io/collector/pdata",
"go.opentelemetry.io/otel",
"github.com/open-telemetry/opentelemetry-collector-contrib",
}

var Opts = cmp.Options{
cmpopts.EquateApprox(0, 1e-9),
cmp.Exporter(func(ty reflect.Type) bool {
return strings.HasPrefix(ty.PkgPath(), "go.opentelemetry.io/collector/pdata") || strings.HasPrefix(ty.PkgPath(), "github.com/open-telemetry/opentelemetry-collector-contrib")
for _, prefix := range allow {
if strings.HasPrefix(ty.PkgPath(), prefix) {
return true
}
}
return false
}),
}

func Equal[T any](a, b T, opts ...cmp.Option) bool {
return cmp.Equal(a, b, append(Opts, opts...)...)
return cmp.Equal(a, b, Opts, cmp.Options(opts))
}

func Diff[T any](a, b T, opts ...cmp.Option) string {
return cmp.Diff(a, b, append(Opts, opts...)...)
return cmp.Diff(a, b, Opts, cmp.Options(opts))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// sdktest performs partial comparison of [sdk.ResourceMetrics] to a [Spec].
package sdktest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/sdktest"

import (
"context"
"fmt"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
sdk "go.opentelemetry.io/otel/sdk/metric/metricdata"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/compare"
)

type Option = cmp.Option

// Test the metrics returned by [metric.ManualReader.Collect] against the [Spec]
func Test(spec Spec, mr *metric.ManualReader, opts ...Option) error {
var rm sdk.ResourceMetrics
if err := mr.Collect(context.Background(), &rm); err != nil {
return err
}
return Compare(spec, rm, opts...)
}

// Compare the [sdk.ResourceMetrics] against the [Spec]
func Compare(spec Spec, rm sdk.ResourceMetrics, opts ...Option) error {
got := Flatten(rm)
want := Metrics(spec)

diff := compare.Diff(want, got,
IgnoreUnspec(spec),
IgnoreTime(),
IgnoreMetadata(),
cmpopts.EquateEmpty(),
Transform(),
Sort(),
cmp.Options(opts),
)

if diff != "" {
return fmt.Errorf("\n%s", diff)
}
return nil
}

// IgnoreTime ignores [sdk.DataPoint.Time] and [sdk.DataPoint.StartTime],
// because those are changing per run and typically not of interest.
func IgnoreTime() Option {
return cmp.Options{
cmpopts.IgnoreFields(sdk.DataPoint[int64]{}, "StartTime", "Time"),
cmpopts.IgnoreFields(sdk.DataPoint[float64]{}, "StartTime", "Time"),
}
}

// IgnoreTime ignores [sdk.Metrics.Unit] and [sdk.Metrics.Description],
// because those are usually static
func IgnoreMetadata() Option {
return cmpopts.IgnoreFields(sdk.Metrics{}, "Description", "Unit")
}

// IgnoreUnspec ignores any Metrics not present in the [Spec]
func IgnoreUnspec(spec Spec) Option {
return cmpopts.IgnoreSliceElements(func(m sdk.Metrics) bool {
_, ok := spec[m.Name]
return !ok
})
}

// Sort [sdk.Metrics] by name and [sdk.DataPoint] by their [attribute.Set]
func Sort() Option {
return cmp.Options{
cmpopts.SortSlices(func(a, b sdk.Metrics) bool {
return a.Name < b.Name
}),
sort[int64](), sort[float64](),
}
}

func sort[N int64 | float64]() Option {
return cmpopts.SortSlices(func(a, b DataPoint[N]) bool {
as := a.DataPoint.Attributes.Encoded(attribute.DefaultEncoder())
bs := b.DataPoint.Attributes.Encoded(attribute.DefaultEncoder())
return as < bs
})
}

// DataPoint holds a [sdk.DataPoints] and its attributes as a plain map.
// See [Transform]
type DataPoint[N int64 | float64] struct {
Attributes map[string]any
sdk.DataPoint[N]
}

// Transform turns []sdk.DataPoint[N] into []DataPoint[N].
//
// Primarily done to have DataPoint.Attributes as a flat, diffable map instead
// of the hard to understand internal structure of [attribute.Set], which is
// being truncated by go-cmp before reaching the depth where attribute values
// appear.
//
// This must happen on the slice level, transforming the values is not
// sufficient because when entire DataPoints are added / removed, go-cmp does
// not apply transformers on the fields.
func Transform() Option {
return cmp.Options{
transform[int64](),
transform[float64](),
cmpopts.IgnoreTypes(attribute.Set{}),
}
}

func transform[N int64 | float64]() Option {
return cmpopts.AcyclicTransformer(fmt.Sprintf("sdktest.Transform.%T", *new(N)),
func(dps []sdk.DataPoint[N]) []DataPoint[N] {
out := make([]DataPoint[N], len(dps))
for i, dp := range dps {
out[i] = DataPoint[N]{DataPoint: dp, Attributes: attrMap(dp.Attributes)}
}
return out
},
)
}

func attrMap(set attribute.Set) map[string]any {
m := make(map[string]any)
for _, kv := range set.ToSlice() {
m[string(kv.Key)] = kv.Value.AsInterface()
}
return m
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sdktest

import (
"context"
"fmt"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
)

// The output of [Test] and [Compare] is similar to the following:
//
// []metricdata.Metrics{
// - {
// - Name: "not.exist",
// - Data: metricdata.Sum[float64]{
// - DataPoints: []metricdata.DataPoint[float64]{{...}},
// - Temporality: s"CumulativeTemporality",
// - },
// - },
// {
// Name: "requests.total",
// Description: "I will be inherited",
// Unit: "",
// Data: metricdata.Sum[int64]{
// DataPoints: []metricdata.DataPoint[int64](Inverse(sdktest.Transform.int64, []sdktest.DataPoint[int64]{
// {DataPoint: {StartTime: s"2024-10-11 11:23:37.966150738 +0200 CEST m=+0.001489569", Time: s"2024-10-11 11:23:37.966174238 +0200 CEST m=+0.001513070", Value: 20, ...}, Attributes: {}},
// {
// DataPoint: metricdata.DataPoint[int64]{
// ... // 1 ignored field
// StartTime: s"2024-10-11 11:23:37.966150738 +0200 CEST m=+0.001489569",
// Time: s"2024-10-11 11:23:37.966174238 +0200 CEST m=+0.001513070",
// - Value: 4,
// + Value: 3,
// Exemplars: nil,
// },
// Attributes: {"error": string("limit")},
// },
// })),
// Temporality: s"CumulativeTemporality",
// IsMonotonic: true,
// },
// },
// }
//
// Which is used as follows:
func Example() {
var spec Spec
_ = Unmarshal([]byte(`
gauge streams.tracked:
- int: 40
counter requests.total:
- int: 20
- int: 4
attr: {error: "limit"}
updown not.exist:
- float: 33.3
`), &spec)

mr := sdk.NewManualReader()
meter := sdk.NewMeterProvider(sdk.WithReader(mr)).Meter("test")

ctx := context.TODO()

gauge, _ := meter.Int64Gauge("streams.tracked")
gauge.Record(ctx, 40)

count, _ := meter.Int64Counter("requests.total", metric.WithDescription("I will be inherited"))
count.Add(ctx, 20)
count.Add(ctx, 3, metric.WithAttributes(attribute.String("error", "limit")))

err := Test(spec, mr)
fmt.Println(err)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sdktest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/sdktest"

import (
sdk "go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// Metrics returns the [sdk.Metrics] defined by this [Spec]
func Metrics(spec Spec) []sdk.Metrics {
md := make([]sdk.Metrics, 0, len(spec))
for _, spec := range spec {
md = append(md, spec.Into())
}
return md
}

func (spec Metric) Into() sdk.Metrics {
m := sdk.Metrics{Name: spec.Name}
if len(spec.Numbers) == 0 {
return m
}

var (
ints []sdk.DataPoint[int64]
floats []sdk.DataPoint[float64]
)
for _, n := range spec.Numbers {
attr := n.Attr.Into()
switch {
case n.Int != nil:
ints = append(ints, sdk.DataPoint[int64]{Attributes: attr, Value: *n.Int})
case n.Float != nil:
floats = append(floats, sdk.DataPoint[float64]{Attributes: attr, Value: *n.Float})
}
}

switch {
case spec.Type == TypeGauge && ints != nil:
m.Data = sdk.Gauge[int64]{DataPoints: ints}
case spec.Type == TypeGauge && floats != nil:
m.Data = sdk.Gauge[float64]{DataPoints: floats}
case spec.Type == TypeSum && ints != nil:
m.Data = sdk.Sum[int64]{DataPoints: ints, Temporality: spec.Temporality, IsMonotonic: spec.Monotonic}
case spec.Type == TypeSum && floats != nil:
m.Data = sdk.Sum[float64]{DataPoints: floats, Temporality: spec.Temporality, IsMonotonic: spec.Monotonic}
}

return m
}

// Flatten turns the nested [sdk.ResourceMetrics] structure into a flat
// [sdk.Metrics] slice. If a metric is present multiple time in different scopes
// / resources, the last occurrence is used.
func Flatten(rm sdk.ResourceMetrics) []sdk.Metrics {
set := make(map[string]sdk.Metrics)
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
set[m.Name] = m
}
}
md := make([]sdk.Metrics, 0, len(set))
for _, m := range set {
md = append(md, m)
}
return md
}
Loading

0 comments on commit 9466fb2

Please sign in to comment.