Skip to content

Commit

Permalink
extract datapoints into their own internal package
Browse files Browse the repository at this point in the history
  • Loading branch information
dmathieu committed Jan 15, 2025
1 parent 338134a commit af44849
Show file tree
Hide file tree
Showing 11 changed files with 363 additions and 275 deletions.
21 changes: 11 additions & 10 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/datapoints"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/pool"
)

Expand Down Expand Up @@ -209,24 +210,24 @@ func (e *elasticsearchExporter) pushMetricsData(
var validationErrs []error // log instead of returning these so that upstream does not retry
scopeMetrics := scopeMetrics.At(j)
scope := scopeMetrics.Scope()
groupedDataPointsByIndex := make(map[string]map[uint32][]dataPoint)
groupedDataPointsByIndex := make(map[string]map[uint32][]datapoints.DataPoint)
for k := 0; k < scopeMetrics.Metrics().Len(); k++ {
metric := scopeMetrics.Metrics().At(k)

upsertDataPoint := func(dp dataPoint) error {
upsertDataPoint := func(dp datapoints.DataPoint) error {
fIndex, err := e.getMetricDataPointIndex(resource, scope, dp)
if err != nil {
return err
}
groupedDataPoints, ok := groupedDataPointsByIndex[fIndex]
if !ok {
groupedDataPoints = make(map[uint32][]dataPoint)
groupedDataPoints = make(map[uint32][]datapoints.DataPoint)
groupedDataPointsByIndex[fIndex] = groupedDataPoints
}
dpHash := e.model.hashDataPoint(dp)
dataPoints, ok := groupedDataPoints[dpHash]
if !ok {
groupedDataPoints[dpHash] = []dataPoint{dp}
groupedDataPoints[dpHash] = []datapoints.DataPoint{dp}
} else {
groupedDataPoints[dpHash] = append(dataPoints, dp)
}
Expand All @@ -238,7 +239,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newNumberDataPoint(metric, dp)); err != nil {
if err := upsertDataPoint(datapoints.NewNumber(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
Expand All @@ -247,7 +248,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Gauge().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newNumberDataPoint(metric, dp)); err != nil {
if err := upsertDataPoint(datapoints.NewNumber(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
Expand All @@ -260,7 +261,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.ExponentialHistogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newExponentialHistogramDataPoint(metric, dp)); err != nil {
if err := upsertDataPoint(datapoints.NewExponentialHistogram(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
Expand All @@ -273,7 +274,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newHistogramDataPoint(metric, dp)); err != nil {
if err := upsertDataPoint(datapoints.NewHistogram(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
Expand All @@ -282,7 +283,7 @@ func (e *elasticsearchExporter) pushMetricsData(
dps := metric.Summary().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newSummaryDataPoint(metric, dp)); err != nil {
if err := upsertDataPoint(datapoints.NewSummary(metric, dp)); err != nil {
validationErrs = append(validationErrs, err)
continue
}
Expand Down Expand Up @@ -326,7 +327,7 @@ func (e *elasticsearchExporter) pushMetricsData(
func (e *elasticsearchExporter) getMetricDataPointIndex(
resource pcommon.Resource,
scope pcommon.InstrumentationScope,
dataPoint dataPoint,
dataPoint datapoints.DataPoint,
) (string, error) {
fIndex := e.index
if e.dynamicIndex {
Expand Down
23 changes: 23 additions & 0 deletions exporter/elasticsearchexporter/internal/datapoints/datapoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datapoints // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/datapoints"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
)

// DataPoint is an interface that allows specifying behavior for each type of data point
type DataPoint interface {
Timestamp() pcommon.Timestamp
StartTimestamp() pcommon.Timestamp
Attributes() pcommon.Map
Value() (pcommon.Value, error)
DynamicTemplate(pmetric.Metric) string
DocCount() uint64
HasMappingHint(elasticsearch.MappingHint) bool
Metric() pmetric.Metric
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datapoints // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/datapoints"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/exphistogram"
)

type ExponentialHistogram struct {
pmetric.ExponentialHistogramDataPoint
elasticsearch.MappingHintGetter
metric pmetric.Metric
}

func NewExponentialHistogram(metric pmetric.Metric, dp pmetric.ExponentialHistogramDataPoint) ExponentialHistogram {
return ExponentialHistogram{
ExponentialHistogramDataPoint: dp,
MappingHintGetter: elasticsearch.NewMappingHintGetter(dp.Attributes()),
metric: metric,
}
}

func (dp ExponentialHistogram) Value() (pcommon.Value, error) {
if dp.HasMappingHint(elasticsearch.HintAggregateMetricDouble) {
vm := pcommon.NewValueMap()
m := vm.Map()
m.PutDouble("sum", dp.Sum())
m.PutInt("value_count", safeUint64ToInt64(dp.Count()))
return vm, nil
}

counts, values := exphistogram.ToTDigest(dp.ExponentialHistogramDataPoint)

vm := pcommon.NewValueMap()
m := vm.Map()
vmCounts := m.PutEmptySlice("counts")
vmCounts.EnsureCapacity(len(counts))
for _, c := range counts {
vmCounts.AppendEmpty().SetInt(c)
}
vmValues := m.PutEmptySlice("values")
vmValues.EnsureCapacity(len(values))
for _, v := range values {
vmValues.AppendEmpty().SetDouble(v)
}

return vm, nil
}

func (dp ExponentialHistogram) DynamicTemplate(_ pmetric.Metric) string {
if dp.HasMappingHint(elasticsearch.HintAggregateMetricDouble) {
return "summary"
}
return "histogram"
}

func (dp ExponentialHistogram) DocCount() uint64 {
return dp.Count()
}

func (dp ExponentialHistogram) Metric() pmetric.Metric {
return dp.metric
}
101 changes: 101 additions & 0 deletions exporter/elasticsearchexporter/internal/datapoints/histogram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datapoints // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/datapoints"

import (
"errors"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
)

type Histogram struct {
pmetric.HistogramDataPoint
elasticsearch.MappingHintGetter
metric pmetric.Metric
}

func NewHistogram(metric pmetric.Metric, dp pmetric.HistogramDataPoint) Histogram {
return Histogram{
HistogramDataPoint: dp,
MappingHintGetter: elasticsearch.NewMappingHintGetter(dp.Attributes()),
metric: metric,
}
}

func (dp Histogram) Value() (pcommon.Value, error) {
if dp.HasMappingHint(elasticsearch.HintAggregateMetricDouble) {
vm := pcommon.NewValueMap()
m := vm.Map()
m.PutDouble("sum", dp.Sum())
m.PutInt("value_count", safeUint64ToInt64(dp.Count()))
return vm, nil
}
return histogramToValue(dp.HistogramDataPoint)
}

func (dp Histogram) DynamicTemplate(_ pmetric.Metric) string {
if dp.HasMappingHint(elasticsearch.HintAggregateMetricDouble) {
return "summary"
}
return "histogram"
}

func (dp Histogram) DocCount() uint64 {
return dp.HistogramDataPoint.Count()
}

func (dp Histogram) Metric() pmetric.Metric {
return dp.metric
}

func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) {
// Histogram conversion function is from
// https://github.com/elastic/apm-data/blob/3b28495c3cbdc0902983134276eb114231730249/input/otlp/metrics.go#L277
bucketCounts := dp.BucketCounts()
explicitBounds := dp.ExplicitBounds()
if bucketCounts.Len() != explicitBounds.Len()+1 || explicitBounds.Len() == 0 {
return pcommon.Value{}, errors.New("invalid histogram data point")
}

vm := pcommon.NewValueMap()
m := vm.Map()
counts := m.PutEmptySlice("counts")
values := m.PutEmptySlice("values")

values.EnsureCapacity(bucketCounts.Len())
counts.EnsureCapacity(bucketCounts.Len())
for i := 0; i < bucketCounts.Len(); i++ {
count := bucketCounts.At(i)
if count == 0 {
continue
}

var value float64
switch i {
// (-infinity, explicit_bounds[i]]
case 0:
value = explicitBounds.At(i)
if value > 0 {
value /= 2
}

// (explicit_bounds[i], +infinity)
case bucketCounts.Len() - 1:
value = explicitBounds.At(i - 1)

// [explicit_bounds[i-1], explicit_bounds[i])
default:
// Use the midpoint between the boundaries.
value = explicitBounds.At(i-1) + (explicitBounds.At(i)-explicitBounds.At(i-1))/2.0
}

counts.AppendEmpty().SetInt(safeUint64ToInt64(count))
values.AppendEmpty().SetDouble(value)
}

return vm, nil
}
82 changes: 82 additions & 0 deletions exporter/elasticsearchexporter/internal/datapoints/number.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package datapoints // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/datapoints"

import (
"errors"
"math"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
)

var errInvalidNumber = errors.New("invalid number data point")

type Number struct {
pmetric.NumberDataPoint
elasticsearch.MappingHintGetter
metric pmetric.Metric
}

func NewNumber(metric pmetric.Metric, dp pmetric.NumberDataPoint) Number {
return Number{
NumberDataPoint: dp,
MappingHintGetter: elasticsearch.NewMappingHintGetter(dp.Attributes()),
metric: metric,
}
}

func (dp Number) Value() (pcommon.Value, error) {
switch dp.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
value := dp.DoubleValue()
if math.IsNaN(value) || math.IsInf(value, 0) {
return pcommon.Value{}, errInvalidNumber
}
return pcommon.NewValueDouble(value), nil
case pmetric.NumberDataPointValueTypeInt:
return pcommon.NewValueInt(dp.IntValue()), nil
}
return pcommon.Value{}, errInvalidNumber
}

func (dp Number) DynamicTemplate(metric pmetric.Metric) string {
switch metric.Type() {
case pmetric.MetricTypeSum:
switch dp.NumberDataPoint.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
if metric.Sum().IsMonotonic() {
return "counter_double"
}
return "gauge_double"
case pmetric.NumberDataPointValueTypeInt:
if metric.Sum().IsMonotonic() {
return "counter_long"
}
return "gauge_long"
default:
return "" // NumberDataPointValueTypeEmpty should already be discarded in numberToValue
}
case pmetric.MetricTypeGauge:
switch dp.NumberDataPoint.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
return "gauge_double"
case pmetric.NumberDataPointValueTypeInt:
return "gauge_long"
default:
return "" // NumberDataPointValueTypeEmpty should already be discarded in numberToValue
}
}
return ""
}

func (dp Number) DocCount() uint64 {
return 1
}

func (dp Number) Metric() pmetric.Metric {
return dp.metric
}
Loading

0 comments on commit af44849

Please sign in to comment.