diff --git a/metrics/engine/engine.go b/metrics/engine/engine.go index 4c4107de1a1..707ca6241e9 100644 --- a/metrics/engine/engine.go +++ b/metrics/engine/engine.go @@ -67,6 +67,7 @@ func (me *MetricsEngine) CreateIngester() output.Output { me.outputIngester = &outputIngester{ logger: me.logger.WithField("component", "metrics-engine-ingester"), metricsEngine: me, + cardinality: newCardinalityControl(), } return me.outputIngester } diff --git a/metrics/engine/ingester.go b/metrics/engine/ingester.go index a8736af8c95..62ffee8293e 100644 --- a/metrics/engine/ingester.go +++ b/metrics/engine/ingester.go @@ -4,10 +4,14 @@ import ( "time" "github.com/sirupsen/logrus" + "go.k6.io/k6/metrics" "go.k6.io/k6/output" ) -const collectRate = 50 * time.Millisecond +const ( + collectRate = 50 * time.Millisecond + timeSeriesFirstLimit = 100_000 +) var _ output.Output = &outputIngester{} @@ -24,6 +28,7 @@ type outputIngester struct { metricsEngine *MetricsEngine periodicFlusher *output.PeriodicFlusher + cardinality *cardinalityControl } // Description returns a human-readable description of the output. @@ -92,6 +97,60 @@ func (oi *outputIngester) flushMetrics() { oi.metricsEngine.markObserved(sm.Metric) sm.Metric.Sink.Add(sample) } + + oi.cardinality.Add(sample.TimeSeries) } } + + if oi.cardinality.LimitHit() { + // TODO: suggest using the Metadata API as an alternative, once it's + // available (e.g. move high-cardinality tags as Metadata) + // https://github.com/grafana/k6/issues/2766 + + oi.logger.Warnf( + "The test has generated metrics with %d unique time series, "+ + "which is higher than the suggested limit of %d "+ + "and could cause high memory usage. "+ + "Consider not using high-cardinality values like unique IDs as metric tags "+ + "or, if you need them in the URL, use the name metric tag or URL grouping. "+ + "See https://k6.io/docs/using-k6/tags-and-groups for details.", oi.cardinality.Count(), timeSeriesFirstLimit) + } +} + +type cardinalityControl struct { + seen map[metrics.TimeSeries]struct{} + timeSeriesLimit int +} + +func newCardinalityControl() *cardinalityControl { + return &cardinalityControl{ + timeSeriesLimit: timeSeriesFirstLimit, + seen: make(map[metrics.TimeSeries]struct{}), + } +} + +// Add adds the passed time series to the list of seen items. +func (cc *cardinalityControl) Add(ts metrics.TimeSeries) { + if _, ok := cc.seen[ts]; ok { + return + } + cc.seen[ts] = struct{}{} +} + +// LimitHit checks if the cardinality limit has been hit. +func (cc *cardinalityControl) LimitHit() bool { + if len(cc.seen) <= cc.timeSeriesLimit { + return false + } + + // we don't care about overflow + // the process should be already OOM + // if the number of generated time series goes higher than N-hundred-million(s). + cc.timeSeriesLimit *= 2 + return true +} + +// Count returns the number of distinct seen time series. +func (cc *cardinalityControl) Count() int { + return len(cc.seen) } diff --git a/metrics/engine/ingester_test.go b/metrics/engine/ingester_test.go index 3bb155243bd..cb292bd2317 100644 --- a/metrics/engine/ingester_test.go +++ b/metrics/engine/ingester_test.go @@ -1,6 +1,7 @@ package engine import ( + "strconv" "testing" "github.com/sirupsen/logrus" @@ -23,6 +24,7 @@ func TestIngesterOutputFlushMetrics(t *testing.T) { metricsEngine: &MetricsEngine{ ObservedMetrics: make(map[string]*metrics.Metric), }, + cardinality: newCardinalityControl(), } require.NoError(t, ingester.Start()) ingester.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ @@ -67,6 +69,7 @@ func TestIngesterOutputFlushSubmetrics(t *testing.T) { ingester := outputIngester{ logger: piState.Logger, metricsEngine: me, + cardinality: newCardinalityControl(), } require.NoError(t, ingester.Start()) ingester.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ @@ -96,6 +99,103 @@ func TestIngesterOutputFlushSubmetrics(t *testing.T) { assert.IsType(t, &metrics.GaugeSink{}, metric.Sink) } +func TestOutputFlushMetricsTimeSeriesWarning(t *testing.T) { + t.Parallel() + + piState := newTestPreInitState(t) + testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Gauge) + require.NoError(t, err) + + logger, hook := testutils.NewMemLogger() + ingester := outputIngester{ + logger: logger, + metricsEngine: &MetricsEngine{ + ObservedMetrics: make(map[string]*metrics.Metric), + }, + cardinality: newCardinalityControl(), + } + ingester.cardinality.timeSeriesLimit = 2 // mock the limit + + require.NoError(t, ingester.Start()) + for i := 0; i < 3; i++ { + ingester.AddMetricSamples([]metrics.SampleContainer{metrics.Sample{ + TimeSeries: metrics.TimeSeries{ + Metric: testMetric, + Tags: piState.Registry.RootTagSet().WithTagsFromMap( + map[string]string{"a": "1", "b": strconv.Itoa(i)}), + }, + Value: 21, + }}) + } + require.NoError(t, ingester.Stop()) + + // to keep things simple the internal limit is not passed to the message + // the code uses directly the global constant limit + expLine := "generated metrics with 3 unique time series, " + + "which is higher than the suggested limit of 100000" + assert.True(t, testutils.LogContains(hook.Drain(), logrus.WarnLevel, expLine)) +} + +func TestCardinalityControlAdd(t *testing.T) { + t.Parallel() + + registry := metrics.NewRegistry() + m1, err := registry.NewMetric("metric1", metrics.Counter) + require.NoError(t, err) + + m2, err := registry.NewMetric("metric2", metrics.Counter) + require.NoError(t, err) + + tags := registry.RootTagSet().With("k", "v") + + cc := newCardinalityControl() + // the first iteration adds two new time series + // the second does not change the count + // because the time series have been already seen before + for i := 0; i < 2; i++ { + cc.Add(metrics.TimeSeries{ + Metric: m1, + Tags: tags, + }) + cc.Add(metrics.TimeSeries{ + Metric: m2, + Tags: tags, + }) + assert.Equal(t, 2, len(cc.seen)) + } +} + +func TestCardinalityControlLimitHit(t *testing.T) { + t.Parallel() + + registry := metrics.NewRegistry() + m1, err := registry.NewMetric("metric1", metrics.Counter) + require.NoError(t, err) + + cc := newCardinalityControl() + cc.timeSeriesLimit = 1 + + cc.Add(metrics.TimeSeries{ + Metric: m1, + Tags: registry.RootTagSet().With("k", "1"), + }) + assert.False(t, cc.LimitHit()) + + // the same time series should not impact the counter + cc.Add(metrics.TimeSeries{ + Metric: m1, + Tags: registry.RootTagSet().With("k", "1"), + }) + assert.False(t, cc.LimitHit()) + + cc.Add(metrics.TimeSeries{ + Metric: m1, + Tags: registry.RootTagSet().With("k", "2"), + }) + assert.True(t, cc.LimitHit()) + assert.Equal(t, 2, cc.timeSeriesLimit, "the limit is expected to be raised") +} + func newTestPreInitState(tb testing.TB) *lib.TestPreInitState { reg := metrics.NewRegistry() logger := testutils.NewLogger(tb)