Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rate Limit ScanStatistics #2779

Merged
merged 1 commit into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/pebble/vfs/atomicfs"
"github.com/cockroachdb/tokenbucket"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -1183,6 +1184,7 @@ func (d *DB) ScanInternal(
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *SharedSSTMeta) error,
includeObsoleteKeys bool,
rateLimitFunc func(key *InternalKey, val LazyValue),
) error {
scanInternalOpts := &scanInternalOptions{
visitPointKey: visitPointKey,
Expand All @@ -1191,6 +1193,7 @@ func (d *DB) ScanInternal(
visitSharedFile: visitSharedFile,
skipSharedLevels: visitSharedFile != nil,
includeObsoleteKeys: includeObsoleteKeys,
rateLimitFunc: rateLimitFunc,
IterOptions: IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: lower,
Expand Down Expand Up @@ -2668,15 +2671,43 @@ type LSMKeyStatistics struct {
Accumulated KeyStatistics
// Levels contains statistics only for point keys. Range deletions and range keys will
// appear in Accumulated but not Levels.
Levels [numLevels]KeyStatistics
Levels [numLevels]KeyStatistics
// BytesRead represents the logical, pre-compression size of keys and values read
BytesRead uint64
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm right now I am not using this BytesRead field, did you have an idea for if we should use it for rate limiting @jbowens

}

// ScanStatisticsOptions is used by DB.ScanStatistics.
type ScanStatisticsOptions struct {
// LimitBytesPerSecond indicates the number of bytes that are able to be read
// per second using ScanInternal.
// A value of 0 indicates that there is no limit set.
LimitBytesPerSecond int64
}

// ScanStatistics returns the count of different key kinds within the lsm for a
// key span [lower, upper) as well as the number of snapshot keys.
func (d *DB) ScanStatistics(ctx context.Context, lower, upper []byte) (LSMKeyStatistics, error) {
func (d *DB) ScanStatistics(
ctx context.Context, lower, upper []byte, opts ScanStatisticsOptions,
) (LSMKeyStatistics, error) {
stats := LSMKeyStatistics{}
var prevKey InternalKey
var rateLimitFunc func(key *InternalKey, val LazyValue)
tb := tokenbucket.TokenBucket{}

if opts.LimitBytesPerSecond != 0 {
// Each "token" roughly corresponds to a byte that was read.
tb.Init(tokenbucket.TokensPerSecond(opts.LimitBytesPerSecond), tokenbucket.Tokens(1024))
rateLimitFunc = func(key *InternalKey, val LazyValue) {
for {
fulfilled, tryAgainAfter := tb.TryToFulfill(tokenbucket.Tokens(key.Size() + val.Len()))

if fulfilled {
break
}
time.Sleep(tryAgainAfter)
}
}
}

err := d.ScanInternal(ctx, lower, upper,
func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error {
Expand Down Expand Up @@ -2714,6 +2745,7 @@ func (d *DB) ScanStatistics(ctx context.Context, lower, upper []byte) (LSMKeySta
},
nil, /* visitSharedFile */
true, /* includeObsoleteKeys */
rateLimitFunc,
)

if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,9 @@ func TestIngestShared(t *testing.T) {
sharedSSTs = append(sharedSSTs, *sst)
return nil
},
false)
false,
nil, /* rateLimitFunc */
)
require.NoError(t, err)
require.NoError(t, w.Close())

Expand Down
3 changes: 3 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ type scanInternalOptions struct {
// includeObsoleteKeys specifies whether keys shadowed by newer internal keys
// are exposed. If false, only one internal key per user key is exposed.
includeObsoleteKeys bool

// rateLimitFunc is used to limit the amount of bytes read per second.
rateLimitFunc func(key *InternalKey, value LazyValue)
}

// RangeKeyMasking configures automatic hiding of point keys by range keys. A
Expand Down
5 changes: 5 additions & 0 deletions scan_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,11 @@ func scanInternalImpl(

for valid := iter.seekGE(lower); valid && iter.error() == nil; valid = iter.next() {
key := iter.unsafeKey()

if opts.rateLimitFunc != nil {
opts.rateLimitFunc(key, iter.lazyValue())
}

switch key.Kind() {
case InternalKeyKindRangeKeyDelete, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeySet:
if opts.visitRangeKey != nil {
Expand Down
5 changes: 4 additions & 1 deletion scan_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestScanStatistics(t *testing.T) {
ScanStatistics(
ctx context.Context,
lower, upper []byte,
opts ScanStatisticsOptions,
) (LSMKeyStatistics, error)
}
batches := map[string]*Batch{}
Expand Down Expand Up @@ -163,7 +164,7 @@ func TestScanStatistics(t *testing.T) {
default:
}
}
stats, err := reader.ScanStatistics(ctx, lower, upper)
stats, err := reader.ScanStatistics(ctx, lower, upper, ScanStatisticsOptions{})
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -208,6 +209,7 @@ func TestScanInternal(t *testing.T) {
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *SharedSSTMeta) error,
includeObsoleteKeys bool,
rateLimitFunc func(key *InternalKey, val LazyValue),
) error
}
batches := map[string]*Batch{}
Expand Down Expand Up @@ -429,6 +431,7 @@ func TestScanInternal(t *testing.T) {
},
fileVisitor,
false,
nil, /* rateLimitFunc */
)
if err != nil {
return err.Error()
Expand Down
2 changes: 2 additions & 0 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (s *Snapshot) ScanInternal(
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *SharedSSTMeta) error,
includeObsoleteKeys bool,
rateLimitFunc func(key *InternalKey, value LazyValue),
) error {
if s.db == nil {
panic(ErrClosed)
Expand All @@ -82,6 +83,7 @@ func (s *Snapshot) ScanInternal(
visitSharedFile: visitSharedFile,
skipSharedLevels: visitSharedFile != nil,
includeObsoleteKeys: includeObsoleteKeys,
rateLimitFunc: rateLimitFunc,
IterOptions: IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: lower,
Expand Down
Loading