Skip to content

Commit

Permalink
Mark all seen series as stale at the end of the test
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed Dec 5, 2022
1 parent 446a103 commit 235be21
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 3 deletions.
56 changes: 53 additions & 3 deletions pkg/remotewrite/remotewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package remotewrite
import (
"context"
"fmt"
"math"
"strings"
"time"

Expand All @@ -15,7 +16,15 @@ import (
prompb "go.buf.build/grpc/go/prometheus/prometheus"
)

var _ output.Output = new(Output)
var (
_ output.Output = new(Output)

// staleNaN is the Prometheus special value for marking
// a time series as stale.
//
// https://pkg.go.dev/github.com/prometheus/prometheus/pkg/value#pkg-constants
staleNaN = math.Float64frombits(0x7ff0000000000002)
)

type Output struct {
output.SampleBuffer
Expand Down Expand Up @@ -80,11 +89,51 @@ func (o *Output) Start() error {

func (o *Output) Stop() error {
o.logger.Debug("Stopping the output")
defer o.logger.Debug("Output stopped")
o.periodicFlusher.Stop()
o.logger.Debug("Output stopped")

staleMarkers := o.staleMarkers(time.Now())

if len(staleMarkers) < 1 {
o.logger.Debug("No time series to mark as stale")
return nil
}
o.logger.WithField("staleMarkers", len(staleMarkers)).Debug("Marking time series as stale")

err := o.client.Store(context.Background(), staleMarkers)
if err != nil {
return fmt.Errorf("marking time series as stale failed: %w", err)
}
return nil
}

// staleMarkers maps all the seen time series with a stale marker.
func (o *Output) staleMarkers(t time.Time) []*prompb.TimeSeries {
timestamp := t.UnixMilli()
staleMarkers := make([]*prompb.TimeSeries, 0, len(o.tsdb))

for _, swm := range o.tsdb {
series := swm.MapPrompb()
// series' length is expected to be equal to 1 for most of the cases
// the unique exception where more than 1 is expected is when
// trend stats have been configured with multiple values.
for _, s := range series {
if len(s.Samples) < 1 {
if len(s.Histograms) < 1 {
panic("data integrity check: samples and native histograms" +
" can't be empty at the same time")
}
s.Samples = append(s.Samples, &prompb.Sample{})
}

s.Samples[0].Value = staleNaN
s.Samples[0].Timestamp = timestamp
}
staleMarkers = append(staleMarkers, series...)
}
return staleMarkers
}

// setTrendStatsResolver sets the resolver for the Trend stats.
//
// TODO: refactor, the code can be improved
Expand Down Expand Up @@ -249,7 +298,7 @@ type seriesWithMeasure struct {
// TODO: maybe add some caching for the mapping?
}

// TODO: unit test this
// TODO: add unit tests
func (swm seriesWithMeasure) MapPrompb() []*prompb.TimeSeries {
var newts []*prompb.TimeSeries

Expand Down Expand Up @@ -290,6 +339,7 @@ func (swm seriesWithMeasure) MapPrompb() []*prompb.TimeSeries {
panic("Measure for Trend types must implement MapPromPb")
}
newts = trend.MapPrompb(swm.TimeSeries, swm.Latest)

default:
panic(fmt.Sprintf("Something is really off, as I cannot recognize the type of metric %s: `%s`", swm.Metric.Name, swm.Metric.Type))
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/remotewrite/remotewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package remotewrite

import (
"fmt"
"math"
"testing"
"time"

Expand Down Expand Up @@ -291,3 +292,50 @@ func TestOutputSetTrendStatsResolver(t *testing.T) {
}())
}
}

func TestOutputStaleMarkers(t *testing.T) {
t.Parallel()

registry := metrics.NewRegistry()
trendSinkSeries := metrics.TimeSeries{
Metric: registry.MustNewMetric("metric1", metrics.Trend),
Tags: registry.RootTagSet(),
}
counterSinkSeries := metrics.TimeSeries{
Metric: registry.MustNewMetric("metric2", metrics.Counter),
Tags: registry.RootTagSet(),
}

o := Output{}
err := o.setTrendStatsResolver([]string{"p(99)"})
require.NoError(t, err)
trendSink, err := newExtendedTrendSink(o.trendStatsResolver)
require.NoError(t, err)

o.tsdb = map[metrics.TimeSeries]*seriesWithMeasure{
trendSinkSeries: {
TimeSeries: trendSinkSeries,
Latest: time.Now(),
// TODO: if Measure would be a lighter interface
// then it could be just a mapper mock.
Measure: trendSink,
},
counterSinkSeries: {
TimeSeries: counterSinkSeries,
Latest: time.Now(),
Measure: &metrics.CounterSink{},
},
}

now := time.Now()
markers := o.staleMarkers(now)
require.Len(t, markers, 2)

sortByNameLabel(markers)
expNameLabels := []string{"k6_metric1_p99", "k6_metric2_total"}
for i, expName := range expNameLabels {
assert.Equal(t, expName, markers[i].Labels[0].Value)
assert.Equal(t, now.UnixMilli(), markers[i].Samples[0].Timestamp)
assert.True(t, math.IsNaN(markers[i].Samples[0].Value), "it isn't a StaleNaN value")
}
}

0 comments on commit 235be21

Please sign in to comment.