Skip to content

Commit

Permalink
Warn when too many time series are generated
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed Mar 29, 2023
1 parent 18b8ab1 commit dccdf1b
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 1 deletion.
1 change: 1 addition & 0 deletions metrics/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (me *MetricsEngine) CreateIngester() output.Output {
me.outputIngester = &outputIngester{
logger: me.logger.WithField("component", "metrics-engine-ingester"),
metricsEngine: me,
cardinality: newCardinalityControl(),
}
return me.outputIngester
}
Expand Down
61 changes: 60 additions & 1 deletion metrics/engine/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"time"

"github.com/sirupsen/logrus"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
)

const collectRate = 50 * time.Millisecond
const (
collectRate = 50 * time.Millisecond
timeSeriesFirstLimit = 100_000
)

var _ output.Output = &outputIngester{}

Expand All @@ -24,6 +28,7 @@ type outputIngester struct {

metricsEngine *MetricsEngine
periodicFlusher *output.PeriodicFlusher
cardinality *cardinalityControl
}

// Description returns a human-readable description of the output.
Expand Down Expand Up @@ -92,6 +97,60 @@ func (oi *outputIngester) flushMetrics() {
oi.metricsEngine.markObserved(sm.Metric)
sm.Metric.Sink.Add(sample)
}

oi.cardinality.Add(sample.TimeSeries)
}
}

if oi.cardinality.LimitHit() {
// TODO: suggest using the Metadata API as an alternative, once it's
// available (e.g. move high-cardinality tags as Metadata)
// https://github.com/grafana/k6/issues/2766

oi.logger.Warnf(
"The test has generated metrics with %d unique time series, "+
"which is higher than the suggested limit of %d "+
"and could cause high memory usage. "+
"Consider not using high-cardinality values like unique IDs as metric tags "+
"or, if you need them in the URL, use the name metric tag or URL grouping. "+
"See https://k6.io/docs/using-k6/tags-and-groups for details.", oi.cardinality.Count(), timeSeriesFirstLimit)
}
}

type cardinalityControl struct {
seen map[metrics.TimeSeries]struct{}
timeSeriesLimit int
}

func newCardinalityControl() *cardinalityControl {
return &cardinalityControl{
timeSeriesLimit: timeSeriesFirstLimit,
seen: make(map[metrics.TimeSeries]struct{}),
}
}

// Add adds the passed time series to the list of seen items.
func (cc *cardinalityControl) Add(ts metrics.TimeSeries) {
if _, ok := cc.seen[ts]; ok {
return
}
cc.seen[ts] = struct{}{}
}

// LimitHit checks if the cardinality limit has been hit.
func (cc *cardinalityControl) LimitHit() bool {
if len(cc.seen) <= cc.timeSeriesLimit {
return false
}

// we don't care about overflow
// the process should be already OOM
// if the number of generated time series goes higher than N-hundred-million(s).
cc.timeSeriesLimit *= 2
return true
}

// Count returns the number of distinct seen time series.
func (cc *cardinalityControl) Count() int {
return len(cc.seen)
}
100 changes: 100 additions & 0 deletions metrics/engine/ingester_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package engine

import (
"strconv"
"testing"

"github.com/sirupsen/logrus"
Expand All @@ -23,6 +24,7 @@ func TestIngesterOutputFlushMetrics(t *testing.T) {
metricsEngine: &MetricsEngine{
ObservedMetrics: make(map[string]*metrics.Metric),
},
cardinality: newCardinalityControl(),
}
require.NoError(t, ingester.Start())
ingester.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{
Expand Down Expand Up @@ -67,6 +69,7 @@ func TestIngesterOutputFlushSubmetrics(t *testing.T) {
ingester := outputIngester{
logger: piState.Logger,
metricsEngine: me,
cardinality: newCardinalityControl(),
}
require.NoError(t, ingester.Start())
ingester.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{
Expand Down Expand Up @@ -96,6 +99,103 @@ func TestIngesterOutputFlushSubmetrics(t *testing.T) {
assert.IsType(t, &metrics.GaugeSink{}, metric.Sink)
}

func TestOutputFlushMetricsTimeSeriesWarning(t *testing.T) {
t.Parallel()

piState := newTestPreInitState(t)
testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Gauge)
require.NoError(t, err)

logger, hook := testutils.NewMemLogger()
ingester := outputIngester{
logger: logger,
metricsEngine: &MetricsEngine{
ObservedMetrics: make(map[string]*metrics.Metric),
},
cardinality: newCardinalityControl(),
}
ingester.cardinality.timeSeriesLimit = 2 // mock the limit

require.NoError(t, ingester.Start())
for i := 0; i < 3; i++ {
ingester.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{
TimeSeries: metrics.TimeSeries{
Metric: testMetric,
Tags: piState.Registry.RootTagSet().WithTagsFromMap(
map[string]string{"a": "1", "b": strconv.Itoa(i)}),
},
Value: 21,
}})
}
require.NoError(t, ingester.Stop())

// to keep things simple the internal limit is not passed to the message
// the code uses directly the global constant limit
expLine := "generated metrics with 3 unique time series, " +
"which is higher than the suggested limit of 100000"
assert.True(t, testutils.LogContains(hook.Drain(), logrus.WarnLevel, expLine))
}

func TestCardinalityControlAdd(t *testing.T) {
t.Parallel()

registry := metrics.NewRegistry()
m1, err := registry.NewMetric("metric1", metrics.Counter)
require.NoError(t, err)

m2, err := registry.NewMetric("metric2", metrics.Counter)
require.NoError(t, err)

tags := registry.RootTagSet().With("k", "v")

cc := newCardinalityControl()
// the first iteration adds two new time series
// the second does not change the count
// because the time series have been already seen before
for i := 0; i < 2; i++ {
cc.Add(metrics.TimeSeries{
Metric: m1,
Tags: tags,
})
cc.Add(metrics.TimeSeries{
Metric: m2,
Tags: tags,
})
assert.Equal(t, 2, len(cc.seen))
}
}

func TestCardinalityControlLimitHit(t *testing.T) {
t.Parallel()

registry := metrics.NewRegistry()
m1, err := registry.NewMetric("metric1", metrics.Counter)
require.NoError(t, err)

cc := newCardinalityControl()
cc.timeSeriesLimit = 1

cc.Add(metrics.TimeSeries{
Metric: m1,
Tags: registry.RootTagSet().With("k", "1"),
})
assert.False(t, cc.LimitHit())

// the same time series should not impact the counter
cc.Add(metrics.TimeSeries{
Metric: m1,
Tags: registry.RootTagSet().With("k", "1"),
})
assert.False(t, cc.LimitHit())

cc.Add(metrics.TimeSeries{
Metric: m1,
Tags: registry.RootTagSet().With("k", "2"),
})
assert.True(t, cc.LimitHit())
assert.Equal(t, 2, cc.timeSeriesLimit, "the limit is expected to be raised")
}

func newTestPreInitState(tb testing.TB) *lib.TestPreInitState {
reg := metrics.NewRegistry()
logger := testutils.NewLogger(tb)
Expand Down

0 comments on commit dccdf1b

Please sign in to comment.