Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stale markers #73

Merged
merged 1 commit into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 53 additions & 3 deletions pkg/remotewrite/remotewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package remotewrite
import (
"context"
"fmt"
"math"
"strings"
"time"

Expand All @@ -15,7 +16,15 @@ import (
prompb "go.buf.build/grpc/go/prometheus/prometheus"
)

var _ output.Output = new(Output)
var (
_ output.Output = new(Output)

// staleNaN is the Prometheus special value for marking
// a time series as stale.
//
// https://pkg.go.dev/github.com/prometheus/prometheus/pkg/value#pkg-constants
staleNaN = math.Float64frombits(0x7ff0000000000002)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to make this particular value a constant?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just use the StaleNaN constant from the prometheus package directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking into this but I'm not sure that the alternative is really better because the const would be a uint64 and it would be accepted if assigned to a float64. So if we forget to convert with math.Float64frombits( then we introduce a bug because the compiler doesn't complain. We should only rely on unit tests. @oleiade WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just use the StaleNaN constant from the prometheus package directly?

@imiric It would require including the Prometheus project as a direct dependency, but it has a lot of dependencies.

)

type Output struct {
output.SampleBuffer
Expand Down Expand Up @@ -80,11 +89,51 @@ func (o *Output) Start() error {

func (o *Output) Stop() error {
o.logger.Debug("Stopping the output")
defer o.logger.Debug("Output stopped")
o.periodicFlusher.Stop()
o.logger.Debug("Output stopped")

staleMarkers := o.staleMarkers(time.Now())

if len(staleMarkers) < 1 {
o.logger.Debug("No time series to mark as stale")
return nil
}
o.logger.WithField("staleMarkers", len(staleMarkers)).Debug("Marking time series as stale")

err := o.client.Store(context.Background(), staleMarkers)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hhmm shouldn't this use some extension-wide context? Preferably one passed from k6? I now see that output.Params doesn't have a context. Maybe it should?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, this falls down to grafana/k6#2430 (comment). We should consider prioritizing it during the next cycle.

For this version, I think the unique way we have is to implement the TODO directly now logging in case of a long Stop execution. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this version, I think the unique way we have is to implement the TODO directly now logging in case of a long Stop execution. WDYT?

Thinking better on this, the current client is already initialized with the Timeout value from the configuration, so the request timed out when the timeout interval is passed. It should be enough for warning in the case of a long-time request and to alarm for a required investigation into eventual bad performance between k6 and the Remote write endpoint.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the best approach would be. Just thought it would probably make more sense to use an existing context, instead of using context.Background() everywhere.

Intuitively, it makes sense to me for the extension context to be based on one received by k6, so that the extension code can cancel any operations in progress if the k6 context is cancelled (due to a signal, test abort, etc.). But if you think that the timeout value from the configuration is enough, then we could go with that instead. I think it can be done in a separate PR, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intuitively, it makes sense to me for the extension context to be based on one received by k6, so that the extension code can cancel any operations in progress if the k6 context is cancelled (due to a signal, test abort, etc.).

Just to be clear, I agree here, and this is something we want to add in the future, this is why the context is passed to the wait function in the Output refactor proposal (wait func(context.Context) error, err error).

Unfortunately, I think we can't do it now, because it would require some changes to k6 when the code is almost frozen.

if err != nil {
return fmt.Errorf("marking time series as stale failed: %w", err)
}
return nil
}

// staleMarkers maps all the seen time series with a stale marker.
func (o *Output) staleMarkers(t time.Time) []*prompb.TimeSeries {
timestamp := t.UnixMilli()
staleMarkers := make([]*prompb.TimeSeries, 0, len(o.tsdb))

for _, swm := range o.tsdb {
imiric marked this conversation as resolved.
Show resolved Hide resolved
series := swm.MapPrompb()
// series' length is expected to be equal to 1 for most of the cases
// the unique exception where more than 1 is expected is when
// trend stats have been configured with multiple values.
for _, s := range series {
if len(s.Samples) < 1 {
if len(s.Histograms) < 1 {
panic("data integrity check: samples and native histograms" +
" can't be empty at the same time")
}
s.Samples = append(s.Samples, &prompb.Sample{})
}

s.Samples[0].Value = staleNaN
s.Samples[0].Timestamp = timestamp
}
staleMarkers = append(staleMarkers, series...)
}
return staleMarkers
}

// setTrendStatsResolver sets the resolver for the Trend stats.
//
// TODO: refactor, the code can be improved
Expand Down Expand Up @@ -249,7 +298,7 @@ type seriesWithMeasure struct {
// TODO: maybe add some caching for the mapping?
}

// TODO: unit test this
// TODO: add unit tests
func (swm seriesWithMeasure) MapPrompb() []*prompb.TimeSeries {
var newts []*prompb.TimeSeries

Expand Down Expand Up @@ -290,6 +339,7 @@ func (swm seriesWithMeasure) MapPrompb() []*prompb.TimeSeries {
panic("Measure for Trend types must implement MapPromPb")
}
newts = trend.MapPrompb(swm.TimeSeries, swm.Latest)

default:
panic(fmt.Sprintf("Something is really off, as I cannot recognize the type of metric %s: `%s`", swm.Metric.Name, swm.Metric.Type))
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/remotewrite/remotewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package remotewrite

import (
"fmt"
"math"
"testing"
"time"

Expand Down Expand Up @@ -291,3 +292,50 @@ func TestOutputSetTrendStatsResolver(t *testing.T) {
}())
}
}

func TestOutputStaleMarkers(t *testing.T) {
t.Parallel()

registry := metrics.NewRegistry()
trendSinkSeries := metrics.TimeSeries{
Metric: registry.MustNewMetric("metric1", metrics.Trend),
Tags: registry.RootTagSet(),
}
counterSinkSeries := metrics.TimeSeries{
Metric: registry.MustNewMetric("metric2", metrics.Counter),
Tags: registry.RootTagSet(),
}

o := Output{}
err := o.setTrendStatsResolver([]string{"p(99)"})
require.NoError(t, err)
trendSink, err := newExtendedTrendSink(o.trendStatsResolver)
require.NoError(t, err)

o.tsdb = map[metrics.TimeSeries]*seriesWithMeasure{
trendSinkSeries: {
TimeSeries: trendSinkSeries,
Latest: time.Now(),
// TODO: if Measure would be a lighter interface
// then it could be just a mapper mock.
Measure: trendSink,
},
counterSinkSeries: {
TimeSeries: counterSinkSeries,
Latest: time.Now(),
Measure: &metrics.CounterSink{},
},
}

now := time.Now()
markers := o.staleMarkers(now)
require.Len(t, markers, 2)

sortByNameLabel(markers)
expNameLabels := []string{"k6_metric1_p99", "k6_metric2_total"}
for i, expName := range expNameLabels {
assert.Equal(t, expName, markers[i].Labels[0].Value)
assert.Equal(t, now.UnixMilli(), markers[i].Samples[0].Timestamp)
assert.True(t, math.IsNaN(markers[i].Samples[0].Value), "it isn't a StaleNaN value")
}
}