diff --git a/pkg/remotewrite/remotewrite.go b/pkg/remotewrite/remotewrite.go index cf7f59c..d2244f5 100644 --- a/pkg/remotewrite/remotewrite.go +++ b/pkg/remotewrite/remotewrite.go @@ -3,6 +3,7 @@ package remotewrite import ( "context" "fmt" + "math" "strings" "time" @@ -15,7 +16,15 @@ import ( prompb "go.buf.build/grpc/go/prometheus/prometheus" ) -var _ output.Output = new(Output) +var ( + _ output.Output = new(Output) + + // staleNaN is the Prometheus special value for marking + // a time series as stale. + // + // https://pkg.go.dev/github.com/prometheus/prometheus/pkg/value#pkg-constants + staleNaN = math.Float64frombits(0x7ff0000000000002) +) type Output struct { output.SampleBuffer @@ -80,11 +89,51 @@ func (o *Output) Start() error { func (o *Output) Stop() error { o.logger.Debug("Stopping the output") + defer o.logger.Debug("Output stopped") o.periodicFlusher.Stop() - o.logger.Debug("Output stopped") + + staleMarkers := o.staleMarkers(time.Now()) + + if len(staleMarkers) < 1 { + o.logger.Debug("No time series to mark as stale") + return nil + } + o.logger.WithField("staleMarkers", len(staleMarkers)).Debug("Marking time series as stale") + + err := o.client.Store(context.Background(), staleMarkers) + if err != nil { + return fmt.Errorf("marking time series as stale failed: %w", err) + } return nil } +// staleMarkers maps all the seen time series with a stale marker. +func (o *Output) staleMarkers(t time.Time) []*prompb.TimeSeries { + timestamp := t.UnixMilli() + staleMarkers := make([]*prompb.TimeSeries, 0, len(o.tsdb)) + + for _, swm := range o.tsdb { + series := swm.MapPrompb() + // series' length is expected to be equal to 1 for most of the cases + // the unique exception where more than 1 is expected is when + // trend stats have been configured with multiple values. + for _, s := range series { + if len(s.Samples) < 1 { + if len(s.Histograms) < 1 { + panic("data integrity check: samples and native histograms" + + " can't be empty at the same time") + } + s.Samples = append(s.Samples, &prompb.Sample{}) + } + + s.Samples[0].Value = staleNaN + s.Samples[0].Timestamp = timestamp + } + staleMarkers = append(staleMarkers, series...) + } + return staleMarkers +} + // setTrendStatsResolver sets the resolver for the Trend stats. // // TODO: refactor, the code can be improved @@ -249,7 +298,7 @@ type seriesWithMeasure struct { // TODO: maybe add some caching for the mapping? } -// TODO: unit test this +// TODO: add unit tests func (swm seriesWithMeasure) MapPrompb() []*prompb.TimeSeries { var newts []*prompb.TimeSeries @@ -290,6 +339,7 @@ func (swm seriesWithMeasure) MapPrompb() []*prompb.TimeSeries { panic("Measure for Trend types must implement MapPromPb") } newts = trend.MapPrompb(swm.TimeSeries, swm.Latest) + default: panic(fmt.Sprintf("Something is really off, as I cannot recognize the type of metric %s: `%s`", swm.Metric.Name, swm.Metric.Type)) } diff --git a/pkg/remotewrite/remotewrite_test.go b/pkg/remotewrite/remotewrite_test.go index c2ae8d7..ab49b13 100644 --- a/pkg/remotewrite/remotewrite_test.go +++ b/pkg/remotewrite/remotewrite_test.go @@ -2,6 +2,7 @@ package remotewrite import ( "fmt" + "math" "testing" "time" @@ -291,3 +292,50 @@ func TestOutputSetTrendStatsResolver(t *testing.T) { }()) } } + +func TestOutputStaleMarkers(t *testing.T) { + t.Parallel() + + registry := metrics.NewRegistry() + trendSinkSeries := metrics.TimeSeries{ + Metric: registry.MustNewMetric("metric1", metrics.Trend), + Tags: registry.RootTagSet(), + } + counterSinkSeries := metrics.TimeSeries{ + Metric: registry.MustNewMetric("metric2", metrics.Counter), + Tags: registry.RootTagSet(), + } + + o := Output{} + err := o.setTrendStatsResolver([]string{"p(99)"}) + require.NoError(t, err) + trendSink, err := newExtendedTrendSink(o.trendStatsResolver) + require.NoError(t, err) + + o.tsdb = map[metrics.TimeSeries]*seriesWithMeasure{ + trendSinkSeries: { + TimeSeries: trendSinkSeries, + Latest: time.Now(), + // TODO: if Measure would be a lighter interface + // then it could be just a mapper mock. + Measure: trendSink, + }, + counterSinkSeries: { + TimeSeries: counterSinkSeries, + Latest: time.Now(), + Measure: &metrics.CounterSink{}, + }, + } + + now := time.Now() + markers := o.staleMarkers(now) + require.Len(t, markers, 2) + + sortByNameLabel(markers) + expNameLabels := []string{"k6_metric1_p99", "k6_metric2_total"} + for i, expName := range expNameLabels { + assert.Equal(t, expName, markers[i].Labels[0].Value) + assert.Equal(t, now.UnixMilli(), markers[i].Samples[0].Timestamp) + assert.True(t, math.IsNaN(markers[i].Samples[0].Value), "it isn't a StaleNaN value") + } +}