Skip to content

Commit

Permalink
[chore][service] Drop component metrics depending on level (#12143)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

<!-- Issue number if applicable -->

Drops metrics that depend on the metrics level:
- Batch processor metric
- otelarrow metrics (see open-telemetry/otel-arrow/issues/280 for
limitation).
- internal/otelarrow/netstats metrics. I did not implement
https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f058256e8339e49e4c89ac622a9ef47b52334/internal/otelarrow/netstats/netstats.go#L133-L136
since `LevelNone` drops all metrics.

This attemps to unblock #11601 by hardcoding the metrics here since
there is a small number of them. Once we do #11754 we can move this back
to the individual components

#### Link to tracking issue

Updates #11061
  • Loading branch information
mx-psi authored Jan 22, 2025
1 parent 6740a28 commit 70f9fe9
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 6 deletions.
11 changes: 9 additions & 2 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,17 @@ func (b *shard[T]) sendItems(trigger trigger) {
return
}
var bytes int
if b.processor.telemetry.detailed {
bpt := b.processor.telemetry

// Check if the instrument is enabled to calculate the size of the batch in bytes.
// See https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/internal/x#readme-instrument-enabled
batchSendSizeBytes := bpt.telemetryBuilder.ProcessorBatchBatchSendSizeBytes
instr, ok := batchSendSizeBytes.(interface{ Enabled(context.Context) bool })
if !ok || instr.Enabled(bpt.exportCtx) {
bytes = b.batch.sizeBytes(req)
}
b.processor.telemetry.record(trigger, int64(sent), int64(bytes))

bpt.record(trigger, int64(sent), int64(bytes))
}

// singleShardBatcher is used when metadataKeys is empty, to avoid the
Expand Down
4 changes: 0 additions & 4 deletions processor/batchprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/batchprocessor/internal/metadata"
"go.opentelemetry.io/collector/processor/internal"
Expand All @@ -23,8 +22,6 @@ const (
)

type batchProcessorTelemetry struct {
detailed bool

exportCtx context.Context

processorAttr metric.MeasurementOption
Expand All @@ -44,7 +41,6 @@ func newBatchProcessorTelemetry(set processor.Settings, currentMetadataCardinali

return &batchProcessorTelemetry{
exportCtx: context.Background(),
detailed: set.MetricsLevel == configtelemetry.LevelDetailed,
telemetryBuilder: telemetryBuilder,
processorAttr: attrs,
}, nil
Expand Down
77 changes: 77 additions & 0 deletions service/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/multierr"
Expand All @@ -36,6 +37,15 @@ type meterProviderSettings struct {
asyncErrorChannel chan error
}

func dropViewOption(instrument sdkmetric.Instrument) sdkmetric.Option {
return sdkmetric.WithView(sdkmetric.NewView(
instrument,
sdkmetric.Stream{
Aggregation: sdkmetric.AggregationDrop{},
},
))
}

// newMeterProvider creates a new MeterProvider from Config.
func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (metric.MeterProvider, error) {
if set.cfg.Level == configtelemetry.LevelNone || len(set.cfg.Readers) == 0 {
Expand All @@ -56,6 +66,73 @@ func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (m
opts = append(opts, sdkmetric.WithReader(r))
}

// otel-arrow library metrics
// See https://github.com/open-telemetry/otel-arrow/blob/c39257/pkg/otel/arrow_record/consumer.go#L174-L176
if set.cfg.Level < configtelemetry.LevelNormal {
scope := instrumentation.Scope{Name: "otel-arrow/pkg/otel/arrow_record"}
opts = append(opts,
dropViewOption(sdkmetric.Instrument{
Name: "arrow_batch_records",
Scope: scope,
}),
dropViewOption(sdkmetric.Instrument{
Name: "arrow_schema_resets",
Scope: scope,
}),
dropViewOption(sdkmetric.Instrument{
Name: "arrow_memory_inuse",
Scope: scope,
}),
)
}

// contrib's internal/otelarrow/netstats metrics
// See
// - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L130
// - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L165
if set.cfg.Level < configtelemetry.LevelDetailed {
scope := instrumentation.Scope{Name: "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"}
// Compressed size metrics.
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_*_compressed_size",
Scope: scope,
}))

opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_*_compressed_size",
Scope: scope,
}))

// makeRecvMetrics for exporters.
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_exporter_recv",
Scope: scope,
}))
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_exporter_recv_wire",
Scope: scope,
}))

// makeSentMetrics for receivers.
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_receiver_sent",
Scope: scope,
}))
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_receiver_sent_wire",
Scope: scope,
}))
}

// Batch processor metrics
if set.cfg.Level < configtelemetry.LevelDetailed {
scope := instrumentation.Scope{Name: "go.opentelemetry.io/collector/processor/batchprocessor"}
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_processor_batch_batch_send_size_bytes",
Scope: scope,
}))
}

var err error
mp.MeterProvider, err = otelinit.InitOpenTelemetry(set.res, opts, disableHighCardinality)
if err != nil {
Expand Down
69 changes: 69 additions & 0 deletions service/telemetry/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (

io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/contrib/config"
"go.opentelemetry.io/otel/metric"
sdkresource "go.opentelemetry.io/otel/sdk/resource"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
Expand Down Expand Up @@ -232,3 +234,70 @@ func getMetricsFromPrometheus(t *testing.T, endpoint string) map[string]*io_prom

return parsed
}

// Test that the MeterProvider implements the 'Enabled' functionality.
// See https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/internal/x#readme-instrument-enabled.
func TestInstrumentEnabled(t *testing.T) {
prom := promtest.GetAvailableLocalAddressPrometheus(t)
set := meterProviderSettings{
res: sdkresource.Default(),
cfg: MetricsConfig{
Level: configtelemetry.LevelDetailed,
Readers: []config.MetricReader{{
Pull: &config.PullMetricReader{Exporter: config.MetricExporter{Prometheus: prom}},
}},
},
asyncErrorChannel: make(chan error),
}
meterProvider, err := newMeterProvider(set, false)
defer func() {
if prov, ok := meterProvider.(interface{ Shutdown(context.Context) error }); ok {
require.NoError(t, prov.Shutdown(context.Background()))
}
}()
require.NoError(t, err)

meter := meterProvider.Meter("go.opentelemetry.io/collector/service/telemetry")

type enabledInstrument interface{ Enabled(context.Context) bool }

intCnt, err := meter.Int64Counter("int64.counter")
require.NoError(t, err)
_, ok := intCnt.(enabledInstrument)
assert.True(t, ok, "Int64Counter does not implement the experimental 'Enabled' method")

intUpDownCnt, err := meter.Int64UpDownCounter("int64.updowncounter")
require.NoError(t, err)
_, ok = intUpDownCnt.(enabledInstrument)
assert.True(t, ok, "Int64UpDownCounter does not implement the experimental 'Enabled' method")

intHist, err := meter.Int64Histogram("int64.updowncounter")
require.NoError(t, err)
_, ok = intHist.(enabledInstrument)
assert.True(t, ok, "Int64Histogram does not implement the experimental 'Enabled' method")

intGauge, err := meter.Int64Gauge("int64.updowncounter")
require.NoError(t, err)
_, ok = intGauge.(enabledInstrument)
assert.True(t, ok, "Int64Gauge does not implement the experimental 'Enabled' method")

floatCnt, err := meter.Float64Counter("int64.updowncounter")
require.NoError(t, err)
_, ok = floatCnt.(enabledInstrument)
assert.True(t, ok, "Float64Counter does not implement the experimental 'Enabled' method")

floatUpDownCnt, err := meter.Float64UpDownCounter("int64.updowncounter")
require.NoError(t, err)
_, ok = floatUpDownCnt.(enabledInstrument)
assert.True(t, ok, "Float64UpDownCounter does not implement the experimental 'Enabled' method")

floatHist, err := meter.Float64Histogram("int64.updowncounter")
require.NoError(t, err)
_, ok = floatHist.(enabledInstrument)
assert.True(t, ok, "Float64Histogram does not implement the experimental 'Enabled' method")

floatGauge, err := meter.Float64Gauge("int64.updowncounter")
require.NoError(t, err)
_, ok = floatGauge.(enabledInstrument)
assert.True(t, ok, "Float64Gauge does not implement the experimental 'Enabled' method")
}

0 comments on commit 70f9fe9

Please sign in to comment.