diff --git a/exporter/awsemfexporter/datapoint.go b/exporter/awsemfexporter/datapoint.go index 031dd43e2eb8..e501a99e483a 100644 --- a/exporter/awsemfexporter/datapoint.go +++ b/exporter/awsemfexporter/datapoint.go @@ -115,6 +115,25 @@ type dataPointSplit struct { capacity int } +func (split *dataPointSplit) isNotFull() bool { + return split.length < split.capacity +} + +func (split *dataPointSplit) setMax(maxVal float64) { + split.cWMetricHistogram.Max = maxVal +} + +func (split *dataPointSplit) setMin(minVal float64) { + split.cWMetricHistogram.Min = minVal +} + +func (split *dataPointSplit) appendMetricData(metricVal float64, count uint64) { + split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal) + split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count)) + split.length++ + split.cWMetricHistogram.Count += count +} + // CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary. func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) { metric := dps.NumberDataPointSlice.At(i) @@ -211,11 +230,11 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, metric := dps.ExponentialHistogramDataPointSlice.At(idx) const splitThreshold = 100 - var currentBucketIndex = 0 + currentBucketIndex := 0 + currentPositiveIndex := metric.Positive().BucketCounts().Len() - 1 + currentZeroIndex := 0 + currentNegativeIndex := 0 var datapoints []dataPoint - var currentPositiveIndex = metric.Positive().BucketCounts().Len() - 1 - var currentZeroIndex = 0 - var currentNegativeIndex = 0 totalBucketLen := metric.Positive().BucketCounts().Len() + metric.Negative().BucketCounts().Len() if metric.ZeroCount() > 0 { totalBucketLen++ @@ -239,6 +258,17 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, for currentBucketIndex < totalBucketLen { // Create a new dataPointSplit with a capacity of up to splitThreshold buckets + capacity := splitThreshold + if totalBucketLen-currentBucketIndex < splitThreshold { + capacity = totalBucketLen - currentBucketIndex + } + + sum := 0.0 + // Only assign `Sum` if this is the first split to make sure the total sum of the datapoints after aggregation is correct. + if currentBucketIndex == 0 { + sum = metric.Sum() + } + split := dataPointSplit{ cWMetricHistogram: &cWMetricHistogram{ Values: []float64{}, @@ -246,27 +276,18 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, Max: metric.Max(), Min: metric.Min(), Count: 0, - Sum: 0, + Sum: sum, }, length: 0, - capacity: splitThreshold, + capacity: capacity, } - // Only assign `Sum` if this is the first split to make sure the total sum of the datapoints after aggregation is correct. - if currentBucketIndex == 0 { - split.cWMetricHistogram.Sum = metric.Sum() - } - - if totalBucketLen-currentBucketIndex < splitThreshold { - split.capacity = totalBucketLen - currentBucketIndex - } - - // Set mid-point of positive buckets in values/counts array. - currentBucketIndex, currentPositiveIndex = iteratePositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex, totalBucketLen) - // Set count of zero bucket in values/counts array. - currentBucketIndex, currentZeroIndex = iterateZeroBucket(&split, metric, currentBucketIndex, currentZeroIndex, totalBucketLen) - // Set mid-point of negative buckets in values/counts array. - currentBucketIndex, currentNegativeIndex = iterateNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex, totalBucketLen) + // Set collect values from positive buckets and save into split. + currentBucketIndex, currentPositiveIndex = collectDatapointsWithPositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex) + // Set collect values from zero buckets and save into split. + currentBucketIndex, currentZeroIndex = collectDatapointsWithZeroBucket(&split, metric, currentBucketIndex, currentZeroIndex) + // Set collect values from negative buckets and save into split. + currentBucketIndex, currentNegativeIndex = collectDatapointsWithNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex) // Add the current split to the datapoints list datapoints = append(datapoints, dataPoint{ @@ -276,10 +297,19 @@ func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, timestampMs: unixNanoToMilliseconds(metric.Timestamp()), }) } + + // Override the min and max values of the first and last splits with the raw data of the metric. + datapoints[0].value.(*cWMetricHistogram).Max = metric.Max() + datapoints[len(datapoints)-1].value.(*cWMetricHistogram).Min = metric.Min() + return datapoints, true } -func iteratePositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int, totalBucketLen int) (int, int) { +func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int) (int, int) { + if !split.isNotFull() || currentPositiveIndex < 0 { + return currentBucketIndex, currentPositiveIndex + } + scale := metric.Scale() base := math.Pow(2, math.Pow(2, float64(-scale))) positiveBuckets := metric.Positive() @@ -288,7 +318,7 @@ func iteratePositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHis bucketBegin := 0.0 bucketEnd := 0.0 - for split.length < split.capacity && currentPositiveIndex >= 0 { + for split.isNotFull() && currentPositiveIndex >= 0 { index := currentPositiveIndex + int(positiveOffset) if bucketEnd == 0 { bucketEnd = math.Pow(base, float64(index+1)) @@ -299,15 +329,14 @@ func iteratePositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHis metricVal := (bucketBegin + bucketEnd) / 2 count := positiveBucketCounts.At(currentPositiveIndex) if count > 0 { - split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal) - split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count)) - split.length++ - split.cWMetricHistogram.Count += count - if split.length == 1 && currentBucketIndex != 0 { - split.cWMetricHistogram.Max = bucketEnd + split.appendMetricData(metricVal, count) + + // The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value) + if split.length == 1 { + split.setMax(bucketEnd) } - if split.length == split.capacity && currentBucketIndex != totalBucketLen-1 { - split.cWMetricHistogram.Min = bucketBegin + if !split.isNotFull() { + split.setMin(bucketBegin) } } currentBucketIndex++ @@ -317,17 +346,16 @@ func iteratePositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHis return currentBucketIndex, currentPositiveIndex } -func iterateZeroBucket(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int, totalBucketLen int) (int, int) { - if metric.ZeroCount() > 0 && split.length < split.capacity && currentZeroIndex == 0 { - split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, 0) - split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(metric.ZeroCount())) - split.length++ - split.cWMetricHistogram.Count += metric.ZeroCount() - if split.length == 1 && currentBucketIndex != 0 { - split.cWMetricHistogram.Max = 0 +func collectDatapointsWithZeroBucket(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int) (int, int) { + if metric.ZeroCount() > 0 && split.isNotFull() && currentZeroIndex == 0 { + split.appendMetricData(0, metric.ZeroCount()) + + // The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value) + if split.length == 1 { + split.setMax(0) } - if split.length == split.capacity && currentBucketIndex != totalBucketLen-1 { - split.cWMetricHistogram.Min = 0 + if !split.isNotFull() { + split.setMin(0) } currentZeroIndex++ currentBucketIndex++ @@ -336,12 +364,16 @@ func iterateZeroBucket(split *dataPointSplit, metric pmetric.ExponentialHistogra return currentBucketIndex, currentZeroIndex } -func iterateNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int, totalBucketLen int) (int, int) { +func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int) (int, int) { // According to metrics spec, the value in histogram is expected to be non-negative. // https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram // However, the negative support is defined in metrics data model. // https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram // The negative is also supported but only verified with unit test. + if !split.isNotFull() || currentNegativeIndex >= metric.Negative().BucketCounts().Len() { + return currentBucketIndex, currentNegativeIndex + } + scale := metric.Scale() base := math.Pow(2, math.Pow(2, float64(-scale))) negativeBuckets := metric.Negative() @@ -350,7 +382,7 @@ func iterateNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHis bucketBegin := 0.0 bucketEnd := 0.0 - for split.length < split.capacity && currentNegativeIndex < metric.Negative().BucketCounts().Len() { + for split.isNotFull() && currentNegativeIndex < metric.Negative().BucketCounts().Len() { index := currentNegativeIndex + int(negativeOffset) if bucketEnd == 0 { bucketEnd = -math.Pow(base, float64(index)) @@ -361,15 +393,14 @@ func iterateNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHis metricVal := (bucketBegin + bucketEnd) / 2 count := negativeBucketCounts.At(currentNegativeIndex) if count > 0 { - split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal) - split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count)) - split.length++ - split.cWMetricHistogram.Count += count - if split.length == 1 && currentBucketIndex != 0 { - split.cWMetricHistogram.Max = bucketEnd + split.appendMetricData(metricVal, count) + + // The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value) + if split.length == 1 { + split.setMax(bucketEnd) } - if split.length == split.capacity && currentBucketIndex != totalBucketLen-1 { - split.cWMetricHistogram.Min = bucketBegin + if !split.isNotFull() { + split.setMin(bucketBegin) } } currentBucketIndex++ diff --git a/exporter/awsemfexporter/datapoint_test.go b/exporter/awsemfexporter/datapoint_test.go index a3004608b0c8..d61a599d8632 100644 --- a/exporter/awsemfexporter/datapoint_test.go +++ b/exporter/awsemfexporter/datapoint_test.go @@ -1347,6 +1347,51 @@ func TestCalculateDeltaDatapoints_ExponentialHistogramDataPointSliceWithSplitDat }, }, }, + { + name: "Exponential histogram with more than 100 buckets, including positive, negative and zero buckets with zero counts", + histogramDPS: func() pmetric.ExponentialHistogramDataPointSlice { + histogramDPS := pmetric.NewExponentialHistogramDataPointSlice() + histogramDP := histogramDPS.AppendEmpty() + posBucketCounts := make([]uint64, 60) + for i := range posBucketCounts { + posBucketCounts[i] = uint64(i % 5) + } + histogramDP.Positive().BucketCounts().FromRaw(posBucketCounts) + histogramDP.SetZeroCount(2) + negBucketCounts := make([]uint64, 60) + for i := range negBucketCounts { + negBucketCounts[i] = uint64(i % 5) + } + histogramDP.Negative().BucketCounts().FromRaw(negBucketCounts) + histogramDP.SetSum(1000) + histogramDP.SetMin(-9e+17) + histogramDP.SetMax(9e+17) + histogramDP.SetCount(uint64(3662)) + histogramDP.Attributes().PutStr("label1", "value1") + return histogramDPS + }(), + expectedDatapoints: []dataPoint{ + { + name: "foo", + value: &cWMetricHistogram{ + Values: []float64{8.646911284551352e+17, 4.323455642275676e+17, 2.161727821137838e+17, 1.080863910568919e+17, 2.7021597764222976e+16, + 1.3510798882111488e+16, 6.755399441055744e+15, 3.377699720527872e+15, 8.44424930131968e+14, 4.22212465065984e+14, 2.11106232532992e+14, + 1.05553116266496e+14, 2.6388279066624e+13, 1.3194139533312e+13, 6.597069766656e+12, 3.298534883328e+12, 8.24633720832e+11, 4.12316860416e+11, + 2.06158430208e+11, 1.03079215104e+11, 2.5769803776e+10, 1.2884901888e+10, 6.442450944e+09, 3.221225472e+09, 8.05306368e+08, 4.02653184e+08, + 2.01326592e+08, 1.00663296e+08, 2.5165824e+07, 1.2582912e+07, 6.291456e+06, 3.145728e+06, 786432, 393216, 196608, 98304, 24576, 12288, 6144, 3072, + 768, 384, 192, 96, 24, 12, 6, 3, 0, -3, -6, -12, -24, -96, -192, -384, -768, -3072, -6144, -12288, -24576, -98304, -196608, -393216, -786432, + -3.145728e+06, -6.291456e+06, -1.2582912e+07, -2.5165824e+07, -1.00663296e+08, -2.01326592e+08, -4.02653184e+08, -8.05306368e+08, -3.221225472e+09, + -6.442450944e+09, -1.2884901888e+10, -2.5769803776e+10, -1.03079215104e+11, -2.06158430208e+11, -4.12316860416e+11, -8.24633720832e+11, -3.298534883328e+12, + -6.597069766656e+12, -1.3194139533312e+13, -2.6388279066624e+13, -1.05553116266496e+14, -2.11106232532992e+14, -4.22212465065984e+14, -8.44424930131968e+14, + -3.377699720527872e+15, -6.755399441055744e+15, -1.3510798882111488e+16, -2.7021597764222976e+16, -1.080863910568919e+17, -2.161727821137838e+17, + -4.323455642275676e+17, -8.646911284551352e+17}, + Counts: []float64{4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, 2, 1, 4, 3, + 2, 1, 2, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4}, + Sum: 1000, Count: 242, Min: -9e+17, Max: 9e+17}, + labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"}, + }, + }, + }, } for _, tc := range testCases {