Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[m3dbnode] Export gauges for metrics with highest active series #4217

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/cmd/services/m3dbnode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,16 @@ type TickConfiguration struct {

// Tick minimum interval controls the minimum tick interval for the node.
MinimumInterval time.Duration `yaml:"minimumInterval"`

// Track top N metrics in terms of cardinality and export gauge metrics for top metrics.
// <= 0 means no tracking.
TopMetricsToTrack int `yaml:"topMetricsToTrack"`
// Only track metrics with cardinality >= minCardinality.
MinCardinalityToTrack int `yaml:"minCardinalityToTrack" validate:"min=10"`
// Cap the size of the map of metric names to cardinality to maxMapLen to avoid unbounded memory usage.
MaxMapLenForTracking int `yaml:"maxMapLenForTracking" validate:"min=10"`
// How often to report the top metrics? Once in every N ticks.
TopMetricsTrackingTicks int `yaml:"topMetricsTrackingTicks" validate:"min=1"`
}

// BlockRetrievePolicy is the block retrieve policy.
Expand Down
18 changes: 16 additions & 2 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,9 +582,23 @@ func Run(runOpts RunOptions) {

if tick := cfg.Tick; tick != nil {
runtimeOpts = runtimeOpts.
SetTickSeriesBatchSize(tick.SeriesBatchSize).
SetTickPerSeriesSleepDuration(tick.PerSeriesSleepDuration).
SetTickMinimumInterval(tick.MinimumInterval)
if tick.SeriesBatchSize > 0 {
runtimeOpts = runtimeOpts.SetTickSeriesBatchSize(tick.SeriesBatchSize)
}
if tick.PerSeriesSleepDuration > 0 {
runtimeOpts = runtimeOpts.SetTickPerSeriesSleepDuration(tick.PerSeriesSleepDuration)
}

logger.Info("Setting up tick configuration", zap.Int("TopMetricsToTrack", cfg.Tick.TopMetricsToTrack))
opts = opts.SetTickOptions(
storage.TickOptions{
TopMetricsToTrack: tick.TopMetricsToTrack,
MinCardinalityToTrack: tick.MinCardinalityToTrack,
MaxMapLenForTracking: tick.MaxMapLenForTracking,
TopMetricsTrackingTicks: tick.TopMetricsTrackingTicks,
},
)
}

runtimeOptsMgr := m3dbruntime.NewOptionsManager()
Expand Down
46 changes: 38 additions & 8 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ type dbNamespace struct {
tickWorkersConcurrency int
statsLastTick databaseNamespaceStatsLastTick

metrics databaseNamespaceMetrics
metrics databaseNamespaceMetrics
tickOptions TickOptions
tickSeqNo int64 // The sequence number of the current tick.
shouldTrackTopMetrics bool
}

type databaseNamespaceStatsLastTick struct {
Expand Down Expand Up @@ -201,6 +204,7 @@ type databaseNamespaceShardMetrics struct {

type databaseNamespaceTickMetrics struct {
activeSeries tally.Gauge
metricCardinality tally.Scope // holds multiple gauges.
expiredSeries tally.Counter
activeBlocks tally.Gauge
wiredBlocks tally.Gauge
Expand All @@ -212,6 +216,8 @@ type databaseNamespaceTickMetrics struct {
errors tally.Counter
index databaseNamespaceIndexTickMetrics
evictedBuckets tally.Counter
// How many nanoseconds for running a metric tracking tick.
tickDuratingNano tally.Gauge
}

type databaseNamespaceIndexTickMetrics struct {
Expand Down Expand Up @@ -277,6 +283,7 @@ func newDatabaseNamespaceMetrics(
},
tick: databaseNamespaceTickMetrics{
activeSeries: tickScope.Gauge("active-series"),
metricCardinality: tickScope.SubScope("top_metric_"), // "top_metric__xyz" for metric "xyz"
expiredSeries: tickScope.Counter("expired-series"),
activeBlocks: tickScope.Gauge("active-blocks"),
wiredBlocks: tickScope.Gauge("wired-blocks"),
Expand All @@ -294,6 +301,7 @@ func newDatabaseNamespaceMetrics(
numBlocksEvicted: indexTickScope.Counter("num-blocks-evicted"),
},
evictedBuckets: tickScope.Counter("evicted-buckets"),
tickDuratingNano: tickScope.Gauge("metric-tracking-duration-nano"),
},
status: databaseNamespaceStatusMetrics{
activeSeries: statusScope.Gauge("active-series"),
Expand Down Expand Up @@ -385,6 +393,9 @@ func newDatabaseNamespace(
tickWorkers: tickWorkers,
tickWorkersConcurrency: tickWorkersConcurrency,
metrics: newDatabaseNamespaceMetrics(scope, iops.TimerOptions()),
tickOptions: opts.TickOptions(),
tickSeqNo: 0,
shouldTrackTopMetrics: opts.TickOptions().TopMetricsToTrack > 0 && opts.TickOptions().MaxMapLenForTracking > 0 && opts.TickOptions().TopMetricsTrackingTicks > 0,
}

n.createEmptyWarmIndexIfNotExistsFn = n.createEmptyWarmIndexIfNotExists
Expand Down Expand Up @@ -626,11 +637,22 @@ func (n *dbNamespace) Tick(c context.Cancellable, startTime xtime.UnixNano) erro

// Tick through the shards at a capped level of concurrency.
var (
r tickResult
multiErr xerrors.MultiError
l sync.Mutex
wg sync.WaitGroup
r tickResult
multiErr xerrors.MultiError
l sync.Mutex
wg sync.WaitGroup
tickOptions = TickOptions{TopMetricsToTrack: 0}
)
n.tickSeqNo++
tickStartTimeNano := xtime.Now()
if n.shouldTrackTopMetrics && (n.tickSeqNo%int64(n.tickOptions.TopMetricsTrackingTicks) == 0) {
tickOptions = n.tickOptions
r.trackTopMetrics()
n.log.Info("Will track top metrics for this tick.",
zap.Int("TopMetricsTrackingTicks", n.tickOptions.TopMetricsTrackingTicks),
zap.Int64("tickSeqNo", n.tickSeqNo),
)
}
for _, shard := range shards {
shard := shard
wg.Add(1)
Expand All @@ -640,11 +662,10 @@ func (n *dbNamespace) Tick(c context.Cancellable, startTime xtime.UnixNano) erro
if c.IsCancelled() {
return
}

shardResult, err := shard.Tick(c, startTime, nsCtx)
shardResult, err := shard.Tick(c, startTime, nsCtx, tickOptions)

l.Lock()
r = r.merge(shardResult)
r.merge(shardResult, tickOptions.TopMetricsToTrack)
multiErr = multiErr.Add(err)
l.Unlock()
})
Expand All @@ -667,6 +688,7 @@ func (n *dbNamespace) Tick(c context.Cancellable, startTime xtime.UnixNano) erro
// NB: we early terminate here to ensure we are not reporting metrics
// based on in-accurate/partial tick results.
if err := multiErr.FinalError(); err != nil || c.IsCancelled() {
n.log.Debug("tick cancelled", zap.Error(err))
return err
}

Expand Down Expand Up @@ -696,6 +718,14 @@ func (n *dbNamespace) Tick(c context.Cancellable, startTime xtime.UnixNano) erro
n.metrics.tick.index.numBlocksEvicted.Inc(indexTickResults.NumBlocksEvicted)
n.metrics.tick.index.numBlocksSealed.Inc(indexTickResults.NumBlocksSealed)
n.metrics.tick.errors.Inc(int64(r.errors))
for _, metric := range r.metricToCardinality {
if metric.cardinality >= tickOptions.MinCardinalityToTrack {
// If the gauge does not exist, it will be created on the fly.
n.metrics.tick.metricCardinality.Gauge(string(metric.name)).Update(float64(metric.cardinality))
}
}
tickDurationNano := xtime.Since(tickStartTimeNano).Nanoseconds()
n.metrics.tick.tickDuratingNano.Update(float64(tickDurationNano))

return nil
}
Expand Down
11 changes: 11 additions & 0 deletions src/dbnode/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ type options struct {
permitsOptions permits.Options
limitsOptions limits.Options
coreFn xsync.CoreFn
tickOptions TickOptions
}

// NewOptions creates a new set of storage options with defaults.
Expand Down Expand Up @@ -934,6 +935,16 @@ func (o *options) SetCoreFn(value xsync.CoreFn) Options {
return &opts
}

func (o *options) SetTickOptions(value TickOptions) Options {
opts := *o
opts.tickOptions = value
return &opts
}

func (o *options) TickOptions() TickOptions {
return o.tickOptions
}

type noOpColdFlush struct{}

func (n *noOpColdFlush) ColdFlushNamespace(Namespace, ColdFlushNsOpts) (OnColdFlushNamespace, error) {
Expand Down
105 changes: 92 additions & 13 deletions src/dbnode/storage/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,27 @@

package storage

import (
"hash/fnv"
"sort"
)

func getHash(b []byte) uint64 {
hash := fnv.New64a()
hash.Write(b)
return hash.Sum64()
}

type metricCardinality struct {
name []byte
cardinality int
}
func newMetricCardinality(name []byte, cardinality int) (uint64, *metricCardinality) {
return getHash(name), &metricCardinality{
name: name,
cardinality: cardinality,
}
}
type tickResult struct {
activeSeries int
expiredSeries int
Expand All @@ -32,20 +53,78 @@ type tickResult struct {
mergedOutOfOrderBlocks int
errors int
evictedBuckets int
// The key is the hash value of the metric name.
metricToCardinality map[uint64]*metricCardinality
}

func (r *tickResult) trackTopMetrics() {
r.metricToCardinality = make(map[uint64]*metricCardinality)
}

func (r *tickResult) truncateTopMetrics(topN int) {
if topN <= 0 {
return
}
if r.metricToCardinality == nil || len(r.metricToCardinality) <= topN {
return
}
// TODO: use a heap to optimize this.
cardinalities := make([]int, 0, len(r.metricToCardinality))
for _, metric := range r.metricToCardinality {
cardinalities = append(cardinalities, metric.cardinality)
}
sort.Sort(sort.Reverse(sort.IntSlice(cardinalities)))
cutoffValue := cardinalities[topN-1]
cutoffValueQuota := 1
for i := topN - 2; i >= 0; i-- {
if cardinalities[i] == cutoffValue {
cutoffValueQuota++
} else {
break
}
}
for hash, metric := range r.metricToCardinality {
if metric.cardinality < cutoffValue {
delete(r.metricToCardinality, hash)
} else if metric.cardinality == cutoffValue {
if cutoffValueQuota > 0 {
cutoffValueQuota--
} else {
delete(r.metricToCardinality, hash)
}
}
}
}

func (r tickResult) merge(other tickResult) tickResult {
return tickResult{
activeSeries: r.activeSeries + other.activeSeries,
expiredSeries: r.expiredSeries + other.expiredSeries,
activeBlocks: r.activeBlocks + other.activeBlocks,
wiredBlocks: r.wiredBlocks + other.wiredBlocks,
pendingMergeBlocks: r.pendingMergeBlocks + other.pendingMergeBlocks,
unwiredBlocks: r.unwiredBlocks + other.unwiredBlocks,
madeExpiredBlocks: r.madeExpiredBlocks + other.madeExpiredBlocks,
madeUnwiredBlocks: r.madeUnwiredBlocks + other.madeUnwiredBlocks,
mergedOutOfOrderBlocks: r.mergedOutOfOrderBlocks + other.mergedOutOfOrderBlocks,
errors: r.errors + other.errors,
evictedBuckets: r.evictedBuckets + other.evictedBuckets,
// NB: this method modifies the receiver in-place.
func (r *tickResult) merge(other tickResult, topN int) {
r.activeSeries += other.activeSeries
r.expiredSeries += other.expiredSeries
r.activeBlocks += other.activeBlocks
r.wiredBlocks += other.wiredBlocks
r.pendingMergeBlocks += other.pendingMergeBlocks
r.unwiredBlocks += other.unwiredBlocks
r.madeExpiredBlocks += other.madeExpiredBlocks
r.madeUnwiredBlocks += other.madeUnwiredBlocks
r.mergedOutOfOrderBlocks += other.mergedOutOfOrderBlocks
r.errors += other.errors
r.evictedBuckets += other.evictedBuckets

if other.metricToCardinality == nil {
return
}
if r.metricToCardinality == nil {
r.metricToCardinality = other.metricToCardinality
return
}

for hash, otherMetric := range other.metricToCardinality {
if currentMetric, ok := r.metricToCardinality[hash]; ok {
currentMetric.cardinality += otherMetric.cardinality
} else {
r.metricToCardinality[hash] = otherMetric
}
}

r.truncateTopMetrics(topN)
}
38 changes: 38 additions & 0 deletions src/dbnode/storage/result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) 2023 Databricks, Inc.
//

package storage

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestMerge(t *testing.T) {
var a, b tickResult
a.metricToCardinality = map[uint64]*metricCardinality{
0: {name: []byte("a"), cardinality: 1},
1: {name: []byte("b"), cardinality: 101},
}
b.metricToCardinality = map[uint64]*metricCardinality{
1: {name: []byte("b"), cardinality: 1000},
2: {name: []byte("c"), cardinality: 101},
}
a.merge(b, 3)
require.Equal(t, 3, len(a.metricToCardinality))
require.Equal(t, 2, len(b.metricToCardinality))
require.Equal(t, 1, a.metricToCardinality[0].cardinality)
require.Equal(t, 1101, a.metricToCardinality[1].cardinality)
require.Equal(t, 101, a.metricToCardinality[2].cardinality)

a.metricToCardinality = map[uint64]*metricCardinality{
0: {name: []byte("a"), cardinality: 1},
1: {name: []byte("b"), cardinality: 101},
}
require.Equal(t, 2, len(a.metricToCardinality))
a.merge(b, 2)
require.Equal(t, 2, len(a.metricToCardinality))
require.Equal(t, 1101, a.metricToCardinality[1].cardinality)
require.Equal(t, 101, a.metricToCardinality[2].cardinality)
}
Loading