diff --git a/internal/otelcollector/exporter/inmemexporter/store.go b/internal/otelcollector/exporter/inmemexporter/store.go index 4003ee0..eeafd77 100644 --- a/internal/otelcollector/exporter/inmemexporter/store.go +++ b/internal/otelcollector/exporter/inmemexporter/store.go @@ -8,6 +8,7 @@ import ( "fmt" "sort" "sync" + "time" "go.elastic.co/fastjson" "go.opentelemetry.io/collector/pdata/pcommon" @@ -23,6 +24,7 @@ type AggregationType string const ( Last AggregationType = "last" Sum AggregationType = "sum" + Rate AggregationType = "rate" ) // Store is a in-memory data store for telemetry data. Data @@ -165,7 +167,16 @@ func (s *Store) Get(cfg AggregationConfig) (float64, error) { if !ok { return 0, nil } - return dp.DoubleValue(), nil + switch cfg.Type { + case Rate: + duration := time.Duration(dp.Timestamp() - dp.StartTimestamp()).Seconds() + if duration <= 0 { + return 0, nil + } + return dp.DoubleValue() / duration, nil + default: + return dp.DoubleValue(), nil + } } // Reset resets the store by deleting all cached data. @@ -196,13 +207,12 @@ func (s *Store) add(m pmetric.Metric, resAttrs pcommon.Map) { case pmetric.MetricTypeSum: if m.Sum().AggregationTemporality() == pmetric.AggregationTemporalityCumulative { s.logger.Warn( - "skipping metric, cumulative temporality not implemented", + "unexpected, all cumulative temporality should be converted to delta", zap.String("name", m.Name()), zap.String("type", m.Type().String()), ) return } - // TODO (lahsivjar): need to handle start time? s.mergeNumberDataPoints(m.Name(), m.Sum().DataPoints(), resAttrs) default: s.logger.Warn( @@ -235,6 +245,23 @@ func (s *Store) mergeNumberDataPoints( to.SetDoubleValue(doubleValue(dp)) case Sum: to.SetDoubleValue(to.DoubleValue() + doubleValue(dp)) + case Rate: + to.SetDoubleValue(to.DoubleValue() + doubleValue(dp)) + // We will use to#StartTimestamp and to#Timestamp fields to + // cache the lowest and the highest timestamps. This will be + // used at query time to calculate rate. + if to.StartTimestamp() == 0 { + // If the data point has a start timestamp then use that + // as the start timestamp, else use the end timestamp. + if dp.StartTimestamp() != 0 { + to.SetStartTimestamp(dp.StartTimestamp()) + } else { + to.SetStartTimestamp(dp.Timestamp()) + } + } + if to.Timestamp() < dp.Timestamp() { + to.SetTimestamp(dp.Timestamp()) + } } } } diff --git a/internal/otelcollector/exporter/inmemexporter/store_test.go b/internal/otelcollector/exporter/inmemexporter/store_test.go index ed07058..af8a83f 100644 --- a/internal/otelcollector/exporter/inmemexporter/store_test.go +++ b/internal/otelcollector/exporter/inmemexporter/store_test.go @@ -6,9 +6,11 @@ package inmemexporter import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" ) @@ -79,6 +81,7 @@ func TestNewStore(t *testing.T) { } func TestAdd(t *testing.T) { + startTime := time.Unix(0, 0).UTC().Add(time.Second) allMetricNames, cfgs := getTestAggCfg() for _, tt := range []struct { name string @@ -91,8 +94,9 @@ func TestAdd(t *testing.T) { []string{"404"}, 1.1, nil, pmetric.MetricTypeGauge, + startTime, startTime, ).get(), - expected: []float64{0, 0}, + expected: []float64{0, 0, 0}, }, { name: "filtered_input", @@ -100,15 +104,19 @@ func TestAdd(t *testing.T) { addMetric( allMetricNames, 1.1, map[string]string{"k_1": "v_1"}, - pmetric.MetricTypeGauge). + pmetric.MetricTypeGauge, + startTime, startTime.Add(time.Second), + ). addMetric( allMetricNames, 2.2, map[string]string{"k_1": "v_1", "k_2": "v_2"}, - pmetric.MetricTypeGauge). - get(), + pmetric.MetricTypeGauge, + startTime.Add(time.Second), startTime.Add(2*time.Second), + ).get(), expected: []float64{ - 3.3, // sum - 2.2, // last + 2.2, // last + 3.3, // sum + 1.65, // rate }, }, { @@ -117,15 +125,19 @@ func TestAdd(t *testing.T) { addMetric( allMetricNames, 1.1, nil, - pmetric.MetricTypeGauge). + pmetric.MetricTypeGauge, + startTime, startTime.Add(time.Second), + ). addMetric( allMetricNames, 2.2, map[string]string{"k_3": "v_3"}, - pmetric.MetricTypeGauge). - get(), + pmetric.MetricTypeGauge, + startTime.Add(time.Second), startTime.Add(2*time.Second), + ).get(), expected: []float64{ - 3.3, // sum - 2.2, // last + 2.2, // last + 3.3, // sum + 1.65, // rate }, }, { @@ -135,24 +147,31 @@ func TestAdd(t *testing.T) { addMetric( allMetricNames, 1.1, nil, - pmetric.MetricTypeGauge). + pmetric.MetricTypeGauge, + startTime, startTime, + ). // label key doesn't match addMetric( allMetricNames, 1.1, map[string]string{"k_2": "v_1"}, - pmetric.MetricTypeGauge). + pmetric.MetricTypeGauge, + startTime, startTime, + ). // label value doesn't match addMetric( allMetricNames, 2.2, map[string]string{"k_1": "v_2"}, - pmetric.MetricTypeGauge). + pmetric.MetricTypeGauge, + startTime, startTime, + ). // name doesn't match addMetric( []string{"404"}, 3.3, map[string]string{"k_1": "v_1"}, - pmetric.MetricTypeGauge). - get(), - expected: []float64{0, 0}, + pmetric.MetricTypeGauge, + startTime, startTime, + ).get(), + expected: []float64{0, 0, 0}, }, { name: "mixed_input", @@ -160,19 +179,25 @@ func TestAdd(t *testing.T) { addMetric( allMetricNames, 1.1, map[string]string{"k_1": "v_1"}, - pmetric.MetricTypeGauge). + pmetric.MetricTypeGauge, + startTime, startTime.Add(time.Second), + ). addMetric( allMetricNames, 2.2, map[string]string{"k_2": "v_1"}, - pmetric.MetricTypeGauge). + pmetric.MetricTypeGauge, + startTime, startTime.Add(time.Second), + ). addMetric( allMetricNames, 3.3, map[string]string{"k_1": "v_1"}, - pmetric.MetricTypeGauge). - get(), + pmetric.MetricTypeGauge, + startTime.Add(time.Second), startTime.Add(2*time.Second), + ).get(), expected: []float64{ - 4.4, // sum 3.3, // last + 4.4, // sum + 2.2, // rate }, }, } { @@ -195,24 +220,25 @@ type testMetricSlice struct { ms pmetric.MetricSlice } -func newMetrics(resAttrs map[string]string) testMetricSlice { +func newMetrics(resAttrs map[string]string) *testMetricSlice { m := pmetric.NewMetrics() rm := m.ResourceMetrics().AppendEmpty() for k, v := range resAttrs { rm.Resource().Attributes().PutStr(k, v) } - return testMetricSlice{ + return &testMetricSlice{ m: m, ms: rm.ScopeMetrics().AppendEmpty().Metrics(), } } -func (tms testMetricSlice) addMetric( +func (tms *testMetricSlice) addMetric( names []string, val float64, attrs map[string]string, t pmetric.MetricType, -) testMetricSlice { + startTime, endTime time.Time, +) *testMetricSlice { for _, name := range names { m := tms.ms.AppendEmpty() m.SetName(name) @@ -220,6 +246,8 @@ func (tms testMetricSlice) addMetric( case pmetric.MetricTypeGauge: dp := m.SetEmptyGauge().DataPoints().AppendEmpty() dp.SetDoubleValue(val) + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(endTime)) for k, v := range attrs { dp.Attributes().PutStr(k, v) } @@ -228,12 +256,19 @@ func (tms testMetricSlice) addMetric( return tms } -func (tms testMetricSlice) get() pmetric.Metrics { +func (tms *testMetricSlice) get() pmetric.Metrics { return tms.m } func getTestAggCfg() ([]string, []AggregationConfig) { cfgs := []AggregationConfig{ + { + Name: "test_last", + MatchLabelValues: map[string]string{ + "k_1": "v_1", + }, + Type: Last, + }, { Name: "test_sum", MatchLabelValues: map[string]string{ @@ -242,11 +277,11 @@ func getTestAggCfg() ([]string, []AggregationConfig) { Type: Sum, }, { - Name: "test_last", + Name: "test_rate", MatchLabelValues: map[string]string{ "k_1": "v_1", }, - Type: Last, + Type: Rate, }, }