Skip to content

Commit

Permalink
Mark all seen series as stale at the end of the test
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed Dec 1, 2022
1 parent fddb22f commit 023b1f2
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 6 deletions.
80 changes: 74 additions & 6 deletions pkg/remotewrite/remotewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package remotewrite
import (
"context"
"fmt"
"math"
"strings"
"time"

Expand All @@ -15,7 +16,15 @@ import (
prompb "go.buf.build/grpc/go/prometheus/prometheus"
)

var _ output.Output = new(Output)
var (
_ output.Output = new(Output)

// staleNaN is the Prometheus special value for marking
// a time series as stale.
//
// https://pkg.go.dev/github.com/prometheus/prometheus/pkg/value#pkg-constants
staleNaN = math.Float64frombits(0x7ff0000000000002)
)

type Output struct {
output.SampleBuffer
Expand Down Expand Up @@ -80,11 +89,38 @@ func (o *Output) Start() error {

func (o *Output) Stop() error {
o.logger.Debug("Stopping the output")
defer o.logger.Debug("Output stopped")
o.periodicFlusher.Stop()
o.logger.Debug("Output stopped")

staleMarkers := o.staleMarkers(time.Now())
o.logger.WithField("staleMarkers", len(staleMarkers)).Debug("Marking time series as stale, if any")
if len(staleMarkers) < 1 {
return nil
}

// TODO: warn if it the request is taking too long (5 seconds?, the same timeout set for flushing)

err := o.client.Store(context.Background(), staleMarkers)
if err != nil {
return fmt.Errorf("marking time series as stale failed: %w", err)
}
return nil
}

func (o *Output) staleMarkers(t time.Time) []*prompb.TimeSeries {
var staleMarkers []*prompb.TimeSeries
for _, swm := range o.tsdb {
if swm.Metric.Type == metrics.Trend && o.config.TrendAsNativeHistogram.Bool {
// stale marker is not required
// for native histograms
continue
}
swm.MarkStale(t)
staleMarkers = append(staleMarkers, swm.MapPrompb()...)
}
return staleMarkers
}

// setTrendStatsResolver sets the resolver for the Trend stats.
//
// TODO: refactor, the code can be improved
Expand Down Expand Up @@ -246,10 +282,11 @@ type seriesWithMeasure struct {
// in a method in struct
Latest time.Time

Stale bool

// TODO: maybe add some caching for the mapping?
}

// TODO: unit test this
func (swm seriesWithMeasure) MapPrompb() []*prompb.TimeSeries {
var newts []*prompb.TimeSeries

Expand All @@ -267,19 +304,34 @@ func (swm seriesWithMeasure) MapPrompb() []*prompb.TimeSeries {
switch swm.Metric.Type {
case metrics.Counter:
ts := mapMonoSeries(swm.TimeSeries, swm.Latest)
ts.Samples[0].Value = swm.Measure.(*metrics.CounterSink).Value
// TODO: refactor, this is too complex
// try to refactor this logic in a dedicated struct / function
// it will be easier to test and to read
if !swm.Stale {
ts.Samples[0].Value = swm.Measure.(*metrics.CounterSink).Value
} else {
ts.Samples[0].Value = staleNaN
}
newts = []*prompb.TimeSeries{&ts}

case metrics.Gauge:
ts := mapMonoSeries(swm.TimeSeries, swm.Latest)
ts.Samples[0].Value = swm.Measure.(*metrics.GaugeSink).Value
if !swm.Stale {
ts.Samples[0].Value = swm.Measure.(*metrics.GaugeSink).Value
} else {
ts.Samples[0].Value = staleNaN
}
newts = []*prompb.TimeSeries{&ts}

case metrics.Rate:
ts := mapMonoSeries(swm.TimeSeries, swm.Latest)
// pass zero duration here because time is useless for formatting rate
rateVals := swm.Measure.(*metrics.RateSink).Format(time.Duration(0))
ts.Samples[0].Value = rateVals["rate"]
if !swm.Stale {
ts.Samples[0].Value = rateVals["rate"]
} else {
ts.Samples[0].Value = staleNaN
}
newts = []*prompb.TimeSeries{&ts}

case metrics.Trend:
Expand All @@ -292,12 +344,28 @@ func (swm seriesWithMeasure) MapPrompb() []*prompb.TimeSeries {
panic("Measure for Trend types must implement MapPromPb")
}
newts = trend.MapPrompb(swm.TimeSeries, swm.Latest)

// TODO: create unit test for native histogram
// TODO: this sub-optimized, it is useless to generate
// all the stats with real values then replace them with StaleNaN.
// Find a way for doing it directly without pass through stats generation.

if swm.Stale {
for _, staleSeries := range newts {
staleSeries.Samples[0].Value = staleNaN
}
}
default:
panic(fmt.Sprintf("Something is really off, as I cannot recognize the type of metric %s: `%s`", swm.Metric.Name, swm.Metric.Type))
}
return newts
}

func (swm *seriesWithMeasure) MarkStale(t time.Time) {
swm.Stale = true
swm.Latest = t
}

type prompbMapper interface {
MapPrompb(series metrics.TimeSeries, t time.Time) []*prompb.TimeSeries
}
Expand Down
50 changes: 50 additions & 0 deletions pkg/remotewrite/remotewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package remotewrite

import (
"fmt"
"math"
"testing"
"time"

Expand Down Expand Up @@ -273,3 +274,52 @@ func TestOutputSetTrendStatsResolver(t *testing.T) {
}())
}
}

func TestOutputStaleMarkers(t *testing.T) {
t.Parallel()

registry := metrics.NewRegistry()
trendSinkSeries := metrics.TimeSeries{
Metric: registry.MustNewMetric("metric1", metrics.Trend),
Tags: registry.RootTagSet(),
}
counterSinkSeries := metrics.TimeSeries{
Metric: registry.MustNewMetric("metric2", metrics.Counter),
Tags: registry.RootTagSet(),
}

o := Output{}
err := o.setTrendStatsResolver([]string{"p(99)"})
require.NoError(t, err)
trendSink, err := newExtendedTrendSink(o.trendStatsResolver)
require.NoError(t, err)

o.tsdb = map[metrics.TimeSeries]*seriesWithMeasure{
trendSinkSeries: {
TimeSeries: trendSinkSeries,
Latest: time.Now(),
// TODO: if Measure would be a lighter interface
// then it could be just a mapper mock.
Measure: trendSink,
},
counterSinkSeries: {
TimeSeries: counterSinkSeries,
Latest: time.Now(),
Measure: &metrics.CounterSink{},
},
}

now := time.Now()
markers := o.staleMarkers(now)
require.Len(t, markers, 2)

sortByNameLabel(markers)

assert.Equal(t, now.UnixMilli(), markers[0].Samples[0].Timestamp)
assert.Equal(t, "k6_metric1_p99", markers[0].Labels[0].Value)
assert.True(t, math.IsNaN(markers[0].Samples[0].Value), "it isn't a StaleNaN value")

assert.Equal(t, now.UnixMilli(), markers[1].Samples[0].Timestamp)
assert.Equal(t, "k6_metric2", markers[1].Labels[0].Value)
assert.True(t, math.IsNaN(markers[0].Samples[0].Value), "it isn't a StaleNaN value")
}

0 comments on commit 023b1f2

Please sign in to comment.