From e02f8adc49f124f73ed60ed6674dd1ec0a7ef2b3 Mon Sep 17 00:00:00 2001 From: Rahul Aggarwal Date: Mon, 31 Jul 2023 11:23:27 -0400 Subject: [PATCH] Rate Limit Scan Statistics This pull request uses a `TokenBucket` to limit the number of keys that read from `ScanStatistics` within a certain period of time. Fixes: #2778 --- db.go | 36 ++++++++++++++++++++++++++++++++++-- ingest_test.go | 4 +++- options.go | 3 +++ scan_internal.go | 5 +++++ scan_internal_test.go | 5 ++++- snapshot.go | 2 ++ 6 files changed, 51 insertions(+), 4 deletions(-) diff --git a/db.go b/db.go index ea4781f1ad..4e25299c12 100644 --- a/db.go +++ b/db.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/pebble/vfs/atomicfs" + "github.com/cockroachdb/tokenbucket" "github.com/prometheus/client_golang/prometheus" ) @@ -1183,6 +1184,7 @@ func (d *DB) ScanInternal( visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, includeObsoleteKeys bool, + rateLimitFunc func(key *InternalKey, val LazyValue), ) error { scanInternalOpts := &scanInternalOptions{ visitPointKey: visitPointKey, @@ -1191,6 +1193,7 @@ func (d *DB) ScanInternal( visitSharedFile: visitSharedFile, skipSharedLevels: visitSharedFile != nil, includeObsoleteKeys: includeObsoleteKeys, + rateLimitFunc: rateLimitFunc, IterOptions: IterOptions{ KeyTypes: IterKeyTypePointsAndRanges, LowerBound: lower, @@ -2668,15 +2671,43 @@ type LSMKeyStatistics struct { Accumulated KeyStatistics // Levels contains statistics only for point keys. Range deletions and range keys will // appear in Accumulated but not Levels. - Levels [numLevels]KeyStatistics + Levels [numLevels]KeyStatistics + // BytesRead represents the logical, pre-compression size of keys and values read BytesRead uint64 } +// ScanStatisticsOptions is used by DB.ScanStatistics. +type ScanStatisticsOptions struct { + // LimitBytesPerSecond indicates the number of bytes that are able to be read + // per second using ScanInternal. + // A value of 0 indicates that there is no limit set. + LimitBytesPerSecond int64 +} + // ScanStatistics returns the count of different key kinds within the lsm for a // key span [lower, upper) as well as the number of snapshot keys. -func (d *DB) ScanStatistics(ctx context.Context, lower, upper []byte) (LSMKeyStatistics, error) { +func (d *DB) ScanStatistics( + ctx context.Context, lower, upper []byte, opts ScanStatisticsOptions, +) (LSMKeyStatistics, error) { stats := LSMKeyStatistics{} var prevKey InternalKey + var rateLimitFunc func(key *InternalKey, val LazyValue) + tb := tokenbucket.TokenBucket{} + + if opts.LimitBytesPerSecond != 0 { + // Each "token" roughly corresponds to a byte that was read. + tb.Init(tokenbucket.TokensPerSecond(opts.LimitBytesPerSecond), tokenbucket.Tokens(1024)) + rateLimitFunc = func(key *InternalKey, val LazyValue) { + for { + fulfilled, tryAgainAfter := tb.TryToFulfill(tokenbucket.Tokens(key.Size() + val.Len())) + + if fulfilled { + break + } + time.Sleep(tryAgainAfter) + } + } + } err := d.ScanInternal(ctx, lower, upper, func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error { @@ -2714,6 +2745,7 @@ func (d *DB) ScanStatistics(ctx context.Context, lower, upper []byte) (LSMKeySta }, nil, /* visitSharedFile */ true, /* includeObsoleteKeys */ + rateLimitFunc, ) if err != nil { diff --git a/ingest_test.go b/ingest_test.go index 2f1ef8eec7..6f5ad68754 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -932,7 +932,9 @@ func TestIngestShared(t *testing.T) { sharedSSTs = append(sharedSSTs, *sst) return nil }, - false) + false, + nil, /* rateLimitFunc */ + ) require.NoError(t, err) require.NoError(t, w.Close()) diff --git a/options.go b/options.go index c3a8df684c..aad95b0add 100644 --- a/options.go +++ b/options.go @@ -259,6 +259,9 @@ type scanInternalOptions struct { // includeObsoleteKeys specifies whether keys shadowed by newer internal keys // are exposed. If false, only one internal key per user key is exposed. includeObsoleteKeys bool + + // rateLimitFunc is used to limit the amount of bytes read per second. + rateLimitFunc func(key *InternalKey, value LazyValue) } // RangeKeyMasking configures automatic hiding of point keys by range keys. A diff --git a/scan_internal.go b/scan_internal.go index ef3275be97..63d919d51c 100644 --- a/scan_internal.go +++ b/scan_internal.go @@ -639,6 +639,11 @@ func scanInternalImpl( for valid := iter.seekGE(lower); valid && iter.error() == nil; valid = iter.next() { key := iter.unsafeKey() + + if opts.rateLimitFunc != nil { + opts.rateLimitFunc(key, iter.lazyValue()) + } + switch key.Kind() { case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet: if opts.visitRangeKey != nil { diff --git a/scan_internal_test.go b/scan_internal_test.go index 88c80e7b44..fc3a36a62e 100644 --- a/scan_internal_test.go +++ b/scan_internal_test.go @@ -31,6 +31,7 @@ func TestScanStatistics(t *testing.T) { ScanStatistics( ctx context.Context, lower, upper []byte, + opts ScanStatisticsOptions, ) (LSMKeyStatistics, error) } batches := map[string]*Batch{} @@ -163,7 +164,7 @@ func TestScanStatistics(t *testing.T) { default: } } - stats, err := reader.ScanStatistics(ctx, lower, upper) + stats, err := reader.ScanStatistics(ctx, lower, upper, ScanStatisticsOptions{}) if err != nil { return err.Error() } @@ -208,6 +209,7 @@ func TestScanInternal(t *testing.T) { visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, includeObsoleteKeys bool, + rateLimitFunc func(key *InternalKey, val LazyValue), ) error } batches := map[string]*Batch{} @@ -429,6 +431,7 @@ func TestScanInternal(t *testing.T) { }, fileVisitor, false, + nil, /* rateLimitFunc */ ) if err != nil { return err.Error() diff --git a/snapshot.go b/snapshot.go index 98eccccfde..430e802fce 100644 --- a/snapshot.go +++ b/snapshot.go @@ -71,6 +71,7 @@ func (s *Snapshot) ScanInternal( visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, includeObsoleteKeys bool, + rateLimitFunc func(key *InternalKey, value LazyValue), ) error { if s.db == nil { panic(ErrClosed) @@ -82,6 +83,7 @@ func (s *Snapshot) ScanInternal( visitSharedFile: visitSharedFile, skipSharedLevels: visitSharedFile != nil, includeObsoleteKeys: includeObsoleteKeys, + rateLimitFunc: rateLimitFunc, IterOptions: IterOptions{ KeyTypes: IterKeyTypePointsAndRanges, LowerBound: lower,