From 48f54caed873eedea9ce03696a9d52d21ddf77a7 Mon Sep 17 00:00:00 2001 From: Rahul Aggarwal Date: Mon, 31 Jul 2023 11:23:27 -0400 Subject: [PATCH] Rate Limit Scan Statistics This pull request uses a `TokenBucket` to limit the number of keys that read from `ScanStatistics` within a certain period of time. Fixes: #2778 --- db.go | 30 +++++++++++++++++++++++++++++- ingest_test.go | 4 +++- options.go | 3 +++ scan_internal.go | 5 +++++ scan_internal_test.go | 5 ++++- snapshot.go | 2 ++ 6 files changed, 46 insertions(+), 3 deletions(-) diff --git a/db.go b/db.go index 8c4e35613f2..a5984ee495a 100644 --- a/db.go +++ b/db.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/pebble/vfs/atomicfs" + "github.com/cockroachdb/tokenbucket" "github.com/prometheus/client_golang/prometheus" ) @@ -1178,6 +1179,7 @@ func (d *DB) ScanInternal( visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, includeObsoleteKeys bool, + rateLimitFunc func(key *InternalKey, val LazyValue), ) error { scanInternalOpts := &scanInternalOptions{ visitPointKey: visitPointKey, @@ -1186,6 +1188,7 @@ func (d *DB) ScanInternal( visitSharedFile: visitSharedFile, skipSharedLevels: visitSharedFile != nil, includeObsoleteKeys: includeObsoleteKeys, + rateLimitFunc: rateLimitFunc, IterOptions: IterOptions{ KeyTypes: IterKeyTypePointsAndRanges, LowerBound: lower, @@ -2599,11 +2602,35 @@ type LSMKeyStatistics struct { levels [numLevels]KeyStatistics } +type ScanStatisticsOptions struct { + rateLimitEnabled bool + rate float64 + burst float64 +} + // ScanStatistics returns the count of different key kinds within the lsm for a // key span [lower, upper) as well as the number of snapshot keys. -func (d *DB) ScanStatistics(ctx context.Context, lower, upper []byte) (LSMKeyStatistics, error) { +func (d *DB) ScanStatistics( + ctx context.Context, lower, upper []byte, opts ScanStatisticsOptions, +) (LSMKeyStatistics, error) { stats := LSMKeyStatistics{} var prevKey InternalKey + tb := tokenbucket.TokenBucket{} + tb.Init(tokenbucket.TokensPerSecond(opts.rate), tokenbucket.Tokens(opts.burst)) + + var rateLimitFunc func(key *InternalKey, val LazyValue) + if opts.rateLimitEnabled { + rateLimitFunc = func(key *InternalKey, val LazyValue) { + for { + fulfilled, tryAgainAfter := tb.TryToFulfill(tokenbucket.Tokens(key.Size() + val.Len())) + + if fulfilled { + break + } + time.Sleep(tryAgainAfter) + } + } + } err := d.ScanInternal(ctx, lower, upper, func(key *InternalKey, value LazyValue, iterInfo iterInfo) error { @@ -2639,6 +2666,7 @@ func (d *DB) ScanStatistics(ctx context.Context, lower, upper []byte) (LSMKeySta }, nil, true, + rateLimitFunc, ) if err != nil { diff --git a/ingest_test.go b/ingest_test.go index effacc060a4..cba29410596 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -932,7 +932,9 @@ func TestIngestShared(t *testing.T) { sharedSSTs = append(sharedSSTs, *sst) return nil }, - false) + false, + nil, /* rateLimitFunc */ + ) require.NoError(t, err) require.NoError(t, w.Close()) diff --git a/options.go b/options.go index 805d2c469b9..ca4f4fa64d7 100644 --- a/options.go +++ b/options.go @@ -259,6 +259,9 @@ type scanInternalOptions struct { // includeObsoleteKeys specifies whether keys shadowed by newer internal keys // are exposed. If false, only one internal key per user key is exposed. includeObsoleteKeys bool + + // rateLimitFunc is used to limit the amount of bytes read per second. + rateLimitFunc func(key *InternalKey, value LazyValue) } // RangeKeyMasking configures automatic hiding of point keys by range keys. A diff --git a/scan_internal.go b/scan_internal.go index 0f19d0683b8..c2d9c00967e 100644 --- a/scan_internal.go +++ b/scan_internal.go @@ -941,6 +941,11 @@ func scanInternalImpl( for valid := iter.seekGE(lower); valid && iter.error() == nil; valid = iter.next() { key := iter.unsafeKey() + + if opts.rateLimitFunc != nil { + opts.rateLimitFunc(key, iter.lazyValue()) + } + switch key.Kind() { case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet: if opts.visitRangeKey != nil { diff --git a/scan_internal_test.go b/scan_internal_test.go index c29485cbdda..f86ee101bf1 100644 --- a/scan_internal_test.go +++ b/scan_internal_test.go @@ -31,6 +31,7 @@ func TestScanStatistics(t *testing.T) { ScanStatistics( ctx context.Context, lower, upper []byte, + opts ScanStatisticsOptions, ) (LSMKeyStatistics, error) } batches := map[string]*Batch{} @@ -163,7 +164,7 @@ func TestScanStatistics(t *testing.T) { default: } } - stats, err := reader.ScanStatistics(ctx, lower, upper) + stats, err := reader.ScanStatistics(ctx, lower, upper, ScanStatisticsOptions{rateLimitEnabled: false}) if err != nil { return err.Error() } @@ -208,6 +209,7 @@ func TestScanInternal(t *testing.T) { visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, includeObsoleteKeys bool, + rateLimitFunc func(key *InternalKey, val LazyValue), ) error } batches := map[string]*Batch{} @@ -429,6 +431,7 @@ func TestScanInternal(t *testing.T) { }, fileVisitor, false, + nil, /* rateLimitFunc */ ) if err != nil { return err.Error() diff --git a/snapshot.go b/snapshot.go index af66042ea41..2b5d0e83d97 100644 --- a/snapshot.go +++ b/snapshot.go @@ -71,6 +71,7 @@ func (s *Snapshot) ScanInternal( visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, includeObsoleteKeys bool, + rateLimitFunc func(key *InternalKey, value LazyValue), ) error { if s.db == nil { panic(ErrClosed) @@ -82,6 +83,7 @@ func (s *Snapshot) ScanInternal( visitSharedFile: visitSharedFile, skipSharedLevels: visitSharedFile != nil, includeObsoleteKeys: includeObsoleteKeys, + rateLimitFunc: rateLimitFunc, IterOptions: IterOptions{ KeyTypes: IterKeyTypePointsAndRanges, LowerBound: lower,