Skip to content

Commit

Permalink
Implement sum per sec aggregation type (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar authored Aug 29, 2023
1 parent b3b0688 commit 4c5271b
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 32 deletions.
33 changes: 30 additions & 3 deletions internal/otelcollector/exporter/inmemexporter/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"sort"
"sync"
"time"

"go.elastic.co/fastjson"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -23,6 +24,7 @@ type AggregationType string
const (
Last AggregationType = "last"
Sum AggregationType = "sum"
Rate AggregationType = "rate"
)

// Store is a in-memory data store for telemetry data. Data
Expand Down Expand Up @@ -165,7 +167,16 @@ func (s *Store) Get(cfg AggregationConfig) (float64, error) {
if !ok {
return 0, nil
}
return dp.DoubleValue(), nil
switch cfg.Type {
case Rate:
duration := time.Duration(dp.Timestamp() - dp.StartTimestamp()).Seconds()
if duration <= 0 {
return 0, nil
}
return dp.DoubleValue() / duration, nil
default:
return dp.DoubleValue(), nil
}
}

// Reset resets the store by deleting all cached data.
Expand Down Expand Up @@ -196,13 +207,12 @@ func (s *Store) add(m pmetric.Metric, resAttrs pcommon.Map) {
case pmetric.MetricTypeSum:
if m.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
s.logger.Warn(
"skipping metric, cumulative temporality not implemented",
"unexpected, all cumulative temporality should be converted to delta",
zap.String("name", m.Name()),
zap.String("type", m.Type().String()),
)
return
}
// TODO (lahsivjar): need to handle start time?
s.mergeNumberDataPoints(m.Name(), m.Sum().DataPoints(), resAttrs)
default:
s.logger.Warn(
Expand Down Expand Up @@ -235,6 +245,23 @@ func (s *Store) mergeNumberDataPoints(
to.SetDoubleValue(doubleValue(dp))
case Sum:
to.SetDoubleValue(to.DoubleValue() + doubleValue(dp))
case Rate:
to.SetDoubleValue(to.DoubleValue() + doubleValue(dp))
// We will use to#StartTimestamp and to#Timestamp fields to
// cache the lowest and the highest timestamps. This will be
// used at query time to calculate rate.
if to.StartTimestamp() == 0 {
// If the data point has a start timestamp then use that
// as the start timestamp, else use the end timestamp.
if dp.StartTimestamp() != 0 {
to.SetStartTimestamp(dp.StartTimestamp())
} else {
to.SetStartTimestamp(dp.Timestamp())
}
}
if to.Timestamp() < dp.Timestamp() {
to.SetTimestamp(dp.Timestamp())
}
}
}
}
Expand Down
93 changes: 64 additions & 29 deletions internal/otelcollector/exporter/inmemexporter/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ package inmemexporter

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -79,6 +81,7 @@ func TestNewStore(t *testing.T) {
}

func TestAdd(t *testing.T) {
startTime := time.Unix(0, 0).UTC().Add(time.Second)
allMetricNames, cfgs := getTestAggCfg()
for _, tt := range []struct {
name string
Expand All @@ -91,24 +94,29 @@ func TestAdd(t *testing.T) {
[]string{"404"}, 1.1,
nil,
pmetric.MetricTypeGauge,
startTime, startTime,
).get(),
expected: []float64{0, 0},
expected: []float64{0, 0, 0},
},
{
name: "filtered_input",
input: newMetrics(nil).
addMetric(
allMetricNames, 1.1,
map[string]string{"k_1": "v_1"},
pmetric.MetricTypeGauge).
pmetric.MetricTypeGauge,
startTime, startTime.Add(time.Second),
).
addMetric(
allMetricNames, 2.2,
map[string]string{"k_1": "v_1", "k_2": "v_2"},
pmetric.MetricTypeGauge).
get(),
pmetric.MetricTypeGauge,
startTime.Add(time.Second), startTime.Add(2*time.Second),
).get(),
expected: []float64{
3.3, // sum
2.2, // last
2.2, // last
3.3, // sum
1.65, // rate
},
},
{
Expand All @@ -117,15 +125,19 @@ func TestAdd(t *testing.T) {
addMetric(
allMetricNames, 1.1,
nil,
pmetric.MetricTypeGauge).
pmetric.MetricTypeGauge,
startTime, startTime.Add(time.Second),
).
addMetric(
allMetricNames, 2.2,
map[string]string{"k_3": "v_3"},
pmetric.MetricTypeGauge).
get(),
pmetric.MetricTypeGauge,
startTime.Add(time.Second), startTime.Add(2*time.Second),
).get(),
expected: []float64{
3.3, // sum
2.2, // last
2.2, // last
3.3, // sum
1.65, // rate
},
},
{
Expand All @@ -135,44 +147,57 @@ func TestAdd(t *testing.T) {
addMetric(
allMetricNames, 1.1,
nil,
pmetric.MetricTypeGauge).
pmetric.MetricTypeGauge,
startTime, startTime,
).
// label key doesn't match
addMetric(
allMetricNames, 1.1,
map[string]string{"k_2": "v_1"},
pmetric.MetricTypeGauge).
pmetric.MetricTypeGauge,
startTime, startTime,
).
// label value doesn't match
addMetric(
allMetricNames, 2.2,
map[string]string{"k_1": "v_2"},
pmetric.MetricTypeGauge).
pmetric.MetricTypeGauge,
startTime, startTime,
).
// name doesn't match
addMetric(
[]string{"404"}, 3.3,
map[string]string{"k_1": "v_1"},
pmetric.MetricTypeGauge).
get(),
expected: []float64{0, 0},
pmetric.MetricTypeGauge,
startTime, startTime,
).get(),
expected: []float64{0, 0, 0},
},
{
name: "mixed_input",
input: newMetrics(nil).
addMetric(
allMetricNames, 1.1,
map[string]string{"k_1": "v_1"},
pmetric.MetricTypeGauge).
pmetric.MetricTypeGauge,
startTime, startTime.Add(time.Second),
).
addMetric(
allMetricNames, 2.2,
map[string]string{"k_2": "v_1"},
pmetric.MetricTypeGauge).
pmetric.MetricTypeGauge,
startTime, startTime.Add(time.Second),
).
addMetric(
allMetricNames, 3.3,
map[string]string{"k_1": "v_1"},
pmetric.MetricTypeGauge).
get(),
pmetric.MetricTypeGauge,
startTime.Add(time.Second), startTime.Add(2*time.Second),
).get(),
expected: []float64{
4.4, // sum
3.3, // last
4.4, // sum
2.2, // rate
},
},
} {
Expand All @@ -195,31 +220,34 @@ type testMetricSlice struct {
ms pmetric.MetricSlice
}

func newMetrics(resAttrs map[string]string) testMetricSlice {
func newMetrics(resAttrs map[string]string) *testMetricSlice {
m := pmetric.NewMetrics()
rm := m.ResourceMetrics().AppendEmpty()
for k, v := range resAttrs {
rm.Resource().Attributes().PutStr(k, v)
}
return testMetricSlice{
return &testMetricSlice{
m: m,
ms: rm.ScopeMetrics().AppendEmpty().Metrics(),
}
}

func (tms testMetricSlice) addMetric(
func (tms *testMetricSlice) addMetric(
names []string,
val float64,
attrs map[string]string,
t pmetric.MetricType,
) testMetricSlice {
startTime, endTime time.Time,
) *testMetricSlice {
for _, name := range names {
m := tms.ms.AppendEmpty()
m.SetName(name)
switch t {
case pmetric.MetricTypeGauge:
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
dp.SetDoubleValue(val)
dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime))
dp.SetTimestamp(pcommon.NewTimestampFromTime(endTime))
for k, v := range attrs {
dp.Attributes().PutStr(k, v)
}
Expand All @@ -228,12 +256,19 @@ func (tms testMetricSlice) addMetric(
return tms
}

func (tms testMetricSlice) get() pmetric.Metrics {
func (tms *testMetricSlice) get() pmetric.Metrics {
return tms.m
}

func getTestAggCfg() ([]string, []AggregationConfig) {
cfgs := []AggregationConfig{
{
Name: "test_last",
MatchLabelValues: map[string]string{
"k_1": "v_1",
},
Type: Last,
},
{
Name: "test_sum",
MatchLabelValues: map[string]string{
Expand All @@ -242,11 +277,11 @@ func getTestAggCfg() ([]string, []AggregationConfig) {
Type: Sum,
},
{
Name: "test_last",
Name: "test_rate",
MatchLabelValues: map[string]string{
"k_1": "v_1",
},
Type: Last,
Type: Rate,
},
}

Expand Down

0 comments on commit 4c5271b

Please sign in to comment.