From df8bc93413de66af14888a3efe428660202b8f1f Mon Sep 17 00:00:00 2001 From: HC Zhu Date: Wed, 29 Mar 2023 15:17:21 -0700 Subject: [PATCH] Export top metrics with their cardinality --- src/cmd/services/m3dbnode/config/config.go | 10 ++ src/dbnode/server/server.go | 18 +++- src/dbnode/storage/namespace.go | 46 +++++++-- src/dbnode/storage/options.go | 11 +++ src/dbnode/storage/result.go | 105 ++++++++++++++++++--- src/dbnode/storage/result_test.go | 38 ++++++++ src/dbnode/storage/shard.go | 41 +++++++- src/dbnode/storage/shard_race_prop_test.go | 6 +- src/dbnode/storage/shard_test.go | 29 +++--- src/dbnode/storage/storage_mock.go | 11 ++- src/dbnode/storage/types.go | 12 ++- 11 files changed, 282 insertions(+), 45 deletions(-) create mode 100644 src/dbnode/storage/result_test.go diff --git a/src/cmd/services/m3dbnode/config/config.go b/src/cmd/services/m3dbnode/config/config.go index 6825fd22ce..26ad6ae9f2 100644 --- a/src/cmd/services/m3dbnode/config/config.go +++ b/src/cmd/services/m3dbnode/config/config.go @@ -495,6 +495,16 @@ type TickConfiguration struct { // Tick minimum interval controls the minimum tick interval for the node. MinimumInterval time.Duration `yaml:"minimumInterval"` + + // Track top N metrics in terms of cardinality and export gauge metrics for top metrics. + // <= 0 means no tracking. + TopMetricsToTrack int `yaml:"topMetricsToTrack"` + // Only track metrics with cardinality >= minCardinality. + MinCardinalityToTrack int `yaml:"minCardinalityToTrack" validate:"min=10"` + // Cap the size of the map of metric names to cardinality to maxMapLen to avoid unbounded memory usage. + MaxMapLenForTracking int `yaml:"maxMapLenForTracking" validate:"min=10"` + // How often to report the top metrics? Once in every N ticks. + TopMetricsTrackingTicks int `yaml:"topMetricsTrackingTicks" validate:"min=1"` } // BlockRetrievePolicy is the block retrieve policy. diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index b1b856c4b4..ae126325bc 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -582,9 +582,23 @@ func Run(runOpts RunOptions) { if tick := cfg.Tick; tick != nil { runtimeOpts = runtimeOpts. - SetTickSeriesBatchSize(tick.SeriesBatchSize). - SetTickPerSeriesSleepDuration(tick.PerSeriesSleepDuration). SetTickMinimumInterval(tick.MinimumInterval) + if tick.SeriesBatchSize > 0 { + runtimeOpts = runtimeOpts.SetTickSeriesBatchSize(tick.SeriesBatchSize) + } + if tick.PerSeriesSleepDuration > 0 { + runtimeOpts = runtimeOpts.SetTickPerSeriesSleepDuration(tick.PerSeriesSleepDuration) + } + + logger.Info("Setting up tick configuration", zap.Int("TopMetricsToTrack", cfg.Tick.TopMetricsToTrack)) + opts = opts.SetTickOptions( + storage.TickOptions{ + TopMetricsToTrack: tick.TopMetricsToTrack, + MinCardinalityToTrack: tick.MinCardinalityToTrack, + MaxMapLenForTracking: tick.MaxMapLenForTracking, + TopMetricsTrackingTicks: tick.TopMetricsTrackingTicks, + }, + ) } runtimeOptsMgr := m3dbruntime.NewOptionsManager() diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 67e46004c6..73415c4368 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -145,7 +145,10 @@ type dbNamespace struct { tickWorkersConcurrency int statsLastTick databaseNamespaceStatsLastTick - metrics databaseNamespaceMetrics + metrics databaseNamespaceMetrics + tickOptions TickOptions + tickSeqNo int64 // The sequence number of the current tick. + shouldTrackTopMetrics bool } type databaseNamespaceStatsLastTick struct { @@ -201,6 +204,7 @@ type databaseNamespaceShardMetrics struct { type databaseNamespaceTickMetrics struct { activeSeries tally.Gauge + metricCardinality tally.Scope // holds multiple gauges. expiredSeries tally.Counter activeBlocks tally.Gauge wiredBlocks tally.Gauge @@ -212,6 +216,8 @@ type databaseNamespaceTickMetrics struct { errors tally.Counter index databaseNamespaceIndexTickMetrics evictedBuckets tally.Counter + // How many nanoseconds for running a metric tracking tick. + tickDuratingNano tally.Gauge } type databaseNamespaceIndexTickMetrics struct { @@ -277,6 +283,7 @@ func newDatabaseNamespaceMetrics( }, tick: databaseNamespaceTickMetrics{ activeSeries: tickScope.Gauge("active-series"), + metricCardinality: tickScope.SubScope("top_metric_"), // "top_metric__xyz" for metric "xyz" expiredSeries: tickScope.Counter("expired-series"), activeBlocks: tickScope.Gauge("active-blocks"), wiredBlocks: tickScope.Gauge("wired-blocks"), @@ -294,6 +301,7 @@ func newDatabaseNamespaceMetrics( numBlocksEvicted: indexTickScope.Counter("num-blocks-evicted"), }, evictedBuckets: tickScope.Counter("evicted-buckets"), + tickDuratingNano: tickScope.Gauge("metric-tracking-duration-nano"), }, status: databaseNamespaceStatusMetrics{ activeSeries: statusScope.Gauge("active-series"), @@ -385,6 +393,9 @@ func newDatabaseNamespace( tickWorkers: tickWorkers, tickWorkersConcurrency: tickWorkersConcurrency, metrics: newDatabaseNamespaceMetrics(scope, iops.TimerOptions()), + tickOptions: opts.TickOptions(), + tickSeqNo: 0, + shouldTrackTopMetrics: opts.TickOptions().TopMetricsToTrack > 0 && opts.TickOptions().MaxMapLenForTracking > 0 && opts.TickOptions().TopMetricsTrackingTicks > 0, } n.createEmptyWarmIndexIfNotExistsFn = n.createEmptyWarmIndexIfNotExists @@ -626,11 +637,22 @@ func (n *dbNamespace) Tick(c context.Cancellable, startTime xtime.UnixNano) erro // Tick through the shards at a capped level of concurrency. var ( - r tickResult - multiErr xerrors.MultiError - l sync.Mutex - wg sync.WaitGroup + r tickResult + multiErr xerrors.MultiError + l sync.Mutex + wg sync.WaitGroup + tickOptions = TickOptions{TopMetricsToTrack: 0} ) + n.tickSeqNo++ + tickStartTimeNano := xtime.Now() + if n.shouldTrackTopMetrics && (n.tickSeqNo%int64(n.tickOptions.TopMetricsTrackingTicks) == 0) { + tickOptions = n.tickOptions + r.trackTopMetrics() + n.log.Info("Will track top metrics for this tick.", + zap.Int("TopMetricsTrackingTicks", n.tickOptions.TopMetricsTrackingTicks), + zap.Int64("tickSeqNo", n.tickSeqNo), + ) + } for _, shard := range shards { shard := shard wg.Add(1) @@ -640,11 +662,10 @@ func (n *dbNamespace) Tick(c context.Cancellable, startTime xtime.UnixNano) erro if c.IsCancelled() { return } - - shardResult, err := shard.Tick(c, startTime, nsCtx) + shardResult, err := shard.Tick(c, startTime, nsCtx, tickOptions) l.Lock() - r = r.merge(shardResult) + r.merge(shardResult, tickOptions.TopMetricsToTrack) multiErr = multiErr.Add(err) l.Unlock() }) @@ -667,6 +688,7 @@ func (n *dbNamespace) Tick(c context.Cancellable, startTime xtime.UnixNano) erro // NB: we early terminate here to ensure we are not reporting metrics // based on in-accurate/partial tick results. if err := multiErr.FinalError(); err != nil || c.IsCancelled() { + n.log.Debug("tick cancelled", zap.Error(err)) return err } @@ -696,6 +718,14 @@ func (n *dbNamespace) Tick(c context.Cancellable, startTime xtime.UnixNano) erro n.metrics.tick.index.numBlocksEvicted.Inc(indexTickResults.NumBlocksEvicted) n.metrics.tick.index.numBlocksSealed.Inc(indexTickResults.NumBlocksSealed) n.metrics.tick.errors.Inc(int64(r.errors)) + for _, metric := range r.metricToCardinality { + if metric.cardinality >= tickOptions.MinCardinalityToTrack { + // If the gauge does not exist, it will be created on the fly. + n.metrics.tick.metricCardinality.Gauge(string(metric.name)).Update(float64(metric.cardinality)) + } + } + tickDurationNano := xtime.Since(tickStartTimeNano).Nanoseconds() + n.metrics.tick.tickDuratingNano.Update(float64(tickDurationNano)) return nil } diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index 7993c5a58f..0c4212a80b 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -176,6 +176,7 @@ type options struct { permitsOptions permits.Options limitsOptions limits.Options coreFn xsync.CoreFn + tickOptions TickOptions } // NewOptions creates a new set of storage options with defaults. @@ -934,6 +935,16 @@ func (o *options) SetCoreFn(value xsync.CoreFn) Options { return &opts } +func (o *options) SetTickOptions(value TickOptions) Options { + opts := *o + opts.tickOptions = value + return &opts +} + +func (o *options) TickOptions() TickOptions { + return o.tickOptions +} + type noOpColdFlush struct{} func (n *noOpColdFlush) ColdFlushNamespace(Namespace, ColdFlushNsOpts) (OnColdFlushNamespace, error) { diff --git a/src/dbnode/storage/result.go b/src/dbnode/storage/result.go index 58b5e0363f..3c75bd568e 100644 --- a/src/dbnode/storage/result.go +++ b/src/dbnode/storage/result.go @@ -20,6 +20,27 @@ package storage +import ( + "hash/fnv" + "sort" +) + +func getHash(b []byte) uint64 { + hash := fnv.New64a() + hash.Write(b) + return hash.Sum64() +} + +type metricCardinality struct { + name []byte + cardinality int +} +func newMetricCardinality(name []byte, cardinality int) (uint64, *metricCardinality) { + return getHash(name), &metricCardinality{ + name: name, + cardinality: cardinality, + } +} type tickResult struct { activeSeries int expiredSeries int @@ -32,20 +53,78 @@ type tickResult struct { mergedOutOfOrderBlocks int errors int evictedBuckets int + // The key is the hash value of the metric name. + metricToCardinality map[uint64]*metricCardinality +} + +func (r *tickResult) trackTopMetrics() { + r.metricToCardinality = make(map[uint64]*metricCardinality) +} + +func (r *tickResult) truncateTopMetrics(topN int) { + if topN <= 0 { + return + } + if r.metricToCardinality == nil || len(r.metricToCardinality) <= topN { + return + } + // TODO: use a heap to optimize this. + cardinalities := make([]int, 0, len(r.metricToCardinality)) + for _, metric := range r.metricToCardinality { + cardinalities = append(cardinalities, metric.cardinality) + } + sort.Sort(sort.Reverse(sort.IntSlice(cardinalities))) + cutoffValue := cardinalities[topN-1] + cutoffValueQuota := 1 + for i := topN - 2; i >= 0; i-- { + if cardinalities[i] == cutoffValue { + cutoffValueQuota++ + } else { + break + } + } + for hash, metric := range r.metricToCardinality { + if metric.cardinality < cutoffValue { + delete(r.metricToCardinality, hash) + } else if metric.cardinality == cutoffValue { + if cutoffValueQuota > 0 { + cutoffValueQuota-- + } else { + delete(r.metricToCardinality, hash) + } + } + } } -func (r tickResult) merge(other tickResult) tickResult { - return tickResult{ - activeSeries: r.activeSeries + other.activeSeries, - expiredSeries: r.expiredSeries + other.expiredSeries, - activeBlocks: r.activeBlocks + other.activeBlocks, - wiredBlocks: r.wiredBlocks + other.wiredBlocks, - pendingMergeBlocks: r.pendingMergeBlocks + other.pendingMergeBlocks, - unwiredBlocks: r.unwiredBlocks + other.unwiredBlocks, - madeExpiredBlocks: r.madeExpiredBlocks + other.madeExpiredBlocks, - madeUnwiredBlocks: r.madeUnwiredBlocks + other.madeUnwiredBlocks, - mergedOutOfOrderBlocks: r.mergedOutOfOrderBlocks + other.mergedOutOfOrderBlocks, - errors: r.errors + other.errors, - evictedBuckets: r.evictedBuckets + other.evictedBuckets, +// NB: this method modifies the receiver in-place. +func (r *tickResult) merge(other tickResult, topN int) { + r.activeSeries += other.activeSeries + r.expiredSeries += other.expiredSeries + r.activeBlocks += other.activeBlocks + r.wiredBlocks += other.wiredBlocks + r.pendingMergeBlocks += other.pendingMergeBlocks + r.unwiredBlocks += other.unwiredBlocks + r.madeExpiredBlocks += other.madeExpiredBlocks + r.madeUnwiredBlocks += other.madeUnwiredBlocks + r.mergedOutOfOrderBlocks += other.mergedOutOfOrderBlocks + r.errors += other.errors + r.evictedBuckets += other.evictedBuckets + + if other.metricToCardinality == nil { + return + } + if r.metricToCardinality == nil { + r.metricToCardinality = other.metricToCardinality + return } + + for hash, otherMetric := range other.metricToCardinality { + if currentMetric, ok := r.metricToCardinality[hash]; ok { + currentMetric.cardinality += otherMetric.cardinality + } else { + r.metricToCardinality[hash] = otherMetric + } + } + + r.truncateTopMetrics(topN) } diff --git a/src/dbnode/storage/result_test.go b/src/dbnode/storage/result_test.go new file mode 100644 index 0000000000..8f01ddf984 --- /dev/null +++ b/src/dbnode/storage/result_test.go @@ -0,0 +1,38 @@ +// Copyright (c) 2023 Databricks, Inc. +// + +package storage + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMerge(t *testing.T) { + var a, b tickResult + a.metricToCardinality = map[uint64]*metricCardinality{ + 0: {name: []byte("a"), cardinality: 1}, + 1: {name: []byte("b"), cardinality: 101}, + } + b.metricToCardinality = map[uint64]*metricCardinality{ + 1: {name: []byte("b"), cardinality: 1000}, + 2: {name: []byte("c"), cardinality: 101}, + } + a.merge(b, 3) + require.Equal(t, 3, len(a.metricToCardinality)) + require.Equal(t, 2, len(b.metricToCardinality)) + require.Equal(t, 1, a.metricToCardinality[0].cardinality) + require.Equal(t, 1101, a.metricToCardinality[1].cardinality) + require.Equal(t, 101, a.metricToCardinality[2].cardinality) + + a.metricToCardinality = map[uint64]*metricCardinality{ + 0: {name: []byte("a"), cardinality: 1}, + 1: {name: []byte("b"), cardinality: 101}, + } + require.Equal(t, 2, len(a.metricToCardinality)) + a.merge(b, 2) + require.Equal(t, 2, len(a.metricToCardinality)) + require.Equal(t, 1101, a.metricToCardinality[1].cardinality) + require.Equal(t, 101, a.metricToCardinality[2].cardinality) +} diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 3a95823e8d..7f9d806d03 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -65,6 +65,7 @@ import ( const ( shardIterateBatchPercent = 0.01 shardIterateBatchMinSize = 16 + metricLabelName = "__name__" ) var ( @@ -651,7 +652,7 @@ func (s *dbShard) Close() error { // causes the GC to impact performance when closing shards the deadline // should be increased. cancellable := context.NewNoOpCanncellable() - _, err := s.tickAndExpire(cancellable, tickPolicyCloseShard, namespace.Context{}) + _, err := s.tickAndExpire(cancellable, tickPolicyCloseShard, namespace.Context{}, TickOptions{TopMetricsToTrack: 0}) return err } @@ -670,15 +671,21 @@ func (s *dbShard) isClosingWithLock() bool { return s.state == dbShardStateClosing } -func (s *dbShard) Tick(c context.Cancellable, startTime xtime.UnixNano, nsCtx namespace.Context) (tickResult, error) { +func (s *dbShard) Tick( + c context.Cancellable, + startTime xtime.UnixNano, + nsCtx namespace.Context, + tickOptions TickOptions, +) (tickResult, error) { s.removeAnyFlushStatesTooEarly(startTime) - return s.tickAndExpire(c, tickPolicyRegular, nsCtx) + return s.tickAndExpire(c, tickPolicyRegular, nsCtx, tickOptions) } func (s *dbShard) tickAndExpire( c context.Cancellable, policy tickPolicy, nsCtx namespace.Context, + tickOptions TickOptions, ) (tickResult, error) { s.Lock() // ensure only one tick can execute at a time @@ -711,7 +718,7 @@ func (s *dbShard) tickAndExpire( }() var ( - r tickResult + r = &tickResult{} terminatedTickingDueToClosing bool i int slept time.Duration @@ -726,6 +733,16 @@ func (s *dbShard) tickAndExpire( // future read lock attempts. blockStates := s.blockStatesSnapshotWithRLock() s.RUnlock() + topMetricsToTrack := tickOptions.TopMetricsToTrack + maxMapLenForTracking := tickOptions.MaxMapLenForTracking + shouldTrackTopMetrics := topMetricsToTrack > 0 && maxMapLenForTracking > 0 + + if shouldTrackTopMetrics { + // Make 'r' ready to track top metrics. + r.trackTopMetrics() + } + + // NB: no lock is held when the func is called. See the implementation of forEachShardEntryBatch(). s.forEachShardEntryBatch(func(currEntries []*Entry) bool { // re-using `expired` to amortize allocs, still need to reset it // to be safe for re-use. @@ -774,6 +791,18 @@ func (s *dbShard) tickAndExpire( if err != nil { r.errors++ } + if shouldTrackTopMetrics { + // TODO: find a cheaper way to get the metric name. 'Get()' iterates on all labels. + if metricNameBytes, ok := entry.Series.Metadata().Get([]byte(metricLabelName)); ok { + nameHash, metric := newMetricCardinality(metricNameBytes, 1) + // We don't handle hash collisions here, since we are not looking for precise top metrics/candinality. + if currentMetric, ok := r.metricToCardinality[nameHash]; ok { + currentMetric.cardinality++ + } else if len(r.metricToCardinality) < maxMapLenForTracking { + r.metricToCardinality[nameHash] = metric + } + } + } } r.activeBlocks += result.ActiveBlocks r.wiredBlocks += result.WiredBlocks @@ -783,6 +812,7 @@ func (s *dbShard) tickAndExpire( r.madeUnwiredBlocks += result.MadeUnwiredBlocks r.mergedOutOfOrderBlocks += result.MergedOutOfOrderBlocks r.evictedBuckets += result.EvictedBuckets + r.truncateTopMetrics(topMetricsToTrack) i++ } @@ -799,10 +829,11 @@ func (s *dbShard) tickAndExpire( }) if terminatedTickingDueToClosing { + s.logger.Debug("Returning empty tick result due to closing") return tickResult{}, errShardClosingTickTerminated } - return r, nil + return *r, nil } // NB(prateek): purgeExpiredSeries requires that all entries passed to it have at least one reader/writer, diff --git a/src/dbnode/storage/shard_race_prop_test.go b/src/dbnode/storage/shard_race_prop_test.go index 9abb85c8d7..b48edb1c4f 100644 --- a/src/dbnode/storage/shard_race_prop_test.go +++ b/src/dbnode/storage/shard_race_prop_test.go @@ -82,7 +82,7 @@ func testShardTickReadFnRace(t *testing.T, ids []ident.ID, tickBatchSize int, fn wg.Add(2) go func() { - _, err := shard.Tick(context.NewNoOpCanncellable(), xtime.Now(), namespace.Context{}) + _, err := shard.Tick(context.NewNoOpCanncellable(), xtime.Now(), namespace.Context{}, TickOptions{}) require.NoError(t, err) wg.Done() }() @@ -205,7 +205,7 @@ func testShardTickWriteRace(t *testing.T, tickBatchSize, numSeries int) { go func() { defer doneFn() <-barrier - _, err := shard.Tick(context.NewNoOpCanncellable(), xtime.Now(), namespace.Context{}) + _, err := shard.Tick(context.NewNoOpCanncellable(), xtime.Now(), namespace.Context{}, TickOptions{}) assert.NoError(t, err) }() @@ -303,7 +303,7 @@ func TestShardTickBootstrapWriteRace(t *testing.T) { go func() { defer doneFn() <-barrier - _, err := shard.Tick(context.NewNoOpCanncellable(), xtime.Now(), namespace.Context{}) + _, err := shard.Tick(context.NewNoOpCanncellable(), xtime.Now(), namespace.Context{}, TickOptions{}) assert.NoError(t, err) }() diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 2064527869..7ef8923b6a 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -926,6 +926,11 @@ func writeShardAndVerify( assert.Equal(t, expectedIdx, seriesWrite.Series.UniqueIndex) } +func runTick(shard *dbShard, nowTs xtime.UnixNano) (tickResult, error) { + return shard.Tick(context.NewNoOpCanncellable(), nowTs, namespace.Context{}, + TickOptions{}) +} + func TestShardTick(t *testing.T) { dir, err := ioutil.TempDir("", "testdir") require.NoError(t, err) @@ -1016,7 +1021,7 @@ func TestShardTick(t *testing.T) { // same time, same value should not write, regardless of being out of order writeShardAndVerify(ctx, t, shard, "foo", nowFn(), 2.0, false, 0) - r, err := shard.Tick(context.NewNoOpCanncellable(), nowFn(), namespace.Context{}) + r, err := runTick(shard, nowFn()) require.NoError(t, err) require.Equal(t, 3, r.activeSeries) require.Equal(t, 0, r.expiredSeries) @@ -1190,7 +1195,7 @@ func testShardWriteAsync(t *testing.T, writes []testWrite) { time.Sleep(10 * time.Millisecond) } - r, err := shard.Tick(context.NewNoOpCanncellable(), xtime.ToUnixNano(nowFn()), namespace.Context{}) + r, err := runTick(shard, xtime.ToUnixNano(nowFn())) require.NoError(t, err) require.Equal(t, len(writes), r.activeSeries) require.Equal(t, 0, r.expiredSeries) @@ -1232,12 +1237,12 @@ func TestShardTickRace(t *testing.T) { wg.Add(2) go func() { - shard.Tick(context.NewNoOpCanncellable(), xtime.Now(), namespace.Context{}) // nolint + runTick(shard, xtime.Now()) // nolint wg.Done() }() go func() { - shard.Tick(context.NewNoOpCanncellable(), xtime.Now(), namespace.Context{}) // nolint + runTick(shard, xtime.Now()) // nolint wg.Done() }() @@ -1263,7 +1268,7 @@ func TestShardTickCleanupSmallBatchSize(t *testing.T) { shard.Bootstrap(ctx, nsCtx) addTestSeries(shard, ident.StringID("foo")) - _, err := shard.Tick(context.NewNoOpCanncellable(), xtime.Now(), namespace.Context{}) + _, err := runTick(shard, xtime.Now()) require.NoError(t, err) require.Equal(t, 0, shard.lookup.Len()) } @@ -1311,7 +1316,7 @@ func TestShardReturnsErrorForConcurrentTicks(t *testing.T) { }).Return(series.TickResult{}, nil) go func() { - _, err := shard.Tick(context.NewNoOpCanncellable(), xtime.Now(), namespace.Context{}) + _, err := runTick(shard, xtime.Now()) if err != nil { panic(err) } @@ -1320,7 +1325,7 @@ func TestShardReturnsErrorForConcurrentTicks(t *testing.T) { go func() { tick1Wg.Wait() - _, err := shard.Tick(context.NewNoOpCanncellable(), xtime.Now(), namespace.Context{}) + _, err := runTick(shard, xtime.Now()) require.Error(t, err) tick2Wg.Done() closeWg.Done() @@ -1384,7 +1389,7 @@ func TestShardTicksStopWhenClosing(t *testing.T) { closeWg.Add(2) go func() { - shard.Tick(context.NewNoOpCanncellable(), xtime.Now(), namespace.Context{}) // nolint + runTick(shard, xtime.Now()) // nolint closeWg.Done() }() @@ -1405,7 +1410,7 @@ func TestPurgeExpiredSeriesEmptySeries(t *testing.T) { addTestSeries(shard, ident.StringID("foo")) - _, err := shard.Tick(context.NewNoOpCanncellable(), xtime.Now(), namespace.Context{}) + _, err := runTick(shard, xtime.Now()) require.NoError(t, err) shard.RLock() @@ -1430,7 +1435,7 @@ func TestPurgeExpiredSeriesNonEmptySeries(t *testing.T) { 1.0, xtime.Second, nil, series.WriteOptions{}) require.NoError(t, err) - r, err := shard.tickAndExpire(context.NewNoOpCanncellable(), tickPolicyRegular, namespace.Context{}) + r, err := shard.tickAndExpire(context.NewNoOpCanncellable(), tickPolicyRegular, namespace.Context{}, TickOptions{}) require.NoError(t, err) require.Equal(t, 1, r.activeSeries) require.Equal(t, 0, r.expiredSeries) @@ -1462,7 +1467,7 @@ func TestPurgeExpiredSeriesWriteAfterTicking(t *testing.T) { require.NoError(t, err) }).Return(series.TickResult{}, series.ErrSeriesAllDatapointsExpired) - r, err := shard.tickAndExpire(context.NewNoOpCanncellable(), tickPolicyRegular, namespace.Context{}) + r, err := shard.tickAndExpire(context.NewNoOpCanncellable(), tickPolicyRegular, namespace.Context{}, TickOptions{}) require.NoError(t, err) require.Equal(t, 0, r.activeSeries) require.Equal(t, 1, r.expiredSeries) @@ -1490,7 +1495,7 @@ func TestPurgeExpiredSeriesWriteAfterPurging(t *testing.T) { require.NoError(t, err) }).Return(series.TickResult{}, series.ErrSeriesAllDatapointsExpired) - r, err := shard.tickAndExpire(context.NewNoOpCanncellable(), tickPolicyRegular, namespace.Context{}) + r, err := shard.tickAndExpire(context.NewNoOpCanncellable(), tickPolicyRegular, namespace.Context{}, TickOptions{}) require.NoError(t, err) require.Equal(t, 0, r.activeSeries) require.Equal(t, 1, r.expiredSeries) diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index ba9ddb4ee4..c0891c6030 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -2290,7 +2290,7 @@ func (mr *MockdatabaseShardMockRecorder) Snapshot(blockStart, snapshotStart, flu } // Tick mocks base method. -func (m *MockdatabaseShard) Tick(c context.Cancellable, startTime time0.UnixNano, nsCtx namespace.Context) (tickResult, error) { +func (m *MockdatabaseShard) Tick(c context.Cancellable, startTime time0.UnixNano, nsCtx namespace.Context, tickOptions TickOptions) (tickResult, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Tick", c, startTime, nsCtx) ret0, _ := ret[0].(tickResult) @@ -3869,6 +3869,15 @@ func NewMockOptions(ctrl *gomock.Controller) *MockOptions { return mock } +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockOptions) SetTickOptions(TickOptions) Options { + return m +} +func (m *MockOptions) TickOptions() TickOptions { + return TickOptions{} +} + + // EXPECT returns an object that allows the caller to indicate expected use. func (m *MockOptions) EXPECT() *MockOptionsMockRecorder { return m.recorder diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 739609e6b4..88442d97f8 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -479,7 +479,7 @@ type databaseShard interface { OnEvictedFromWiredList(id ident.ID, blockStart xtime.UnixNano) // Tick performs all async updates - Tick(c context.Cancellable, startTime xtime.UnixNano, nsCtx namespace.Context) (tickResult, error) + Tick(c context.Cancellable, startTime xtime.UnixNano, nsCtx namespace.Context, tickOptions TickOptions) (tickResult, error) // Write writes a value to the shard for an ID. Write( @@ -1010,6 +1010,13 @@ type OnColdFlushNamespace interface { // OptionTransform transforms given Options. type OptionTransform func(Options) Options +type TickOptions struct { + TopMetricsToTrack int + MinCardinalityToTrack int + MaxMapLenForTracking int + TopMetricsTrackingTicks int +} + // Options represents the options for storage. type Options interface { // Validate validates assumptions baked into the code. @@ -1344,6 +1351,9 @@ type Options interface { // SetCoreFn sets the function for determining the current core. SetCoreFn(value xsync.CoreFn) Options + + SetTickOptions(value TickOptions) Options + TickOptions() TickOptions } // MemoryTracker tracks memory.