Skip to content

Commit

Permalink
sstable: make Category an enum
Browse files Browse the repository at this point in the history
Instead of using arbitrary strings, we use an enum. Categories must
now be registered upfront (from an `init` function). We also associate
the QoS level upon registration, simplifying the plumbing.
  • Loading branch information
RaduBerinde committed Nov 1, 2024
1 parent c1712d1 commit 71bb6ba
Show file tree
Hide file tree
Showing 16 changed files with 151 additions and 102 deletions.
14 changes: 4 additions & 10 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,11 +733,8 @@ func (c *compaction) newInputIters(
}
}()
iterOpts := IterOptions{
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-compaction",
QoSLevel: sstable.NonLatencySensitiveQoSLevel,
},
logger: c.logger,
Category: categoryCompaction,
logger: c.logger,
}

// Populate iters, rangeDelIters and rangeKeyIters with the appropriate
Expand Down Expand Up @@ -1246,11 +1243,8 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
comparer: d.opts.Comparer,
newIters: d.newIters,
opts: IterOptions{
logger: d.opts.Logger,
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
logger: d.opts.Logger,
Category: categoryIngest,
},
v: c.version,
}
Expand Down
13 changes: 5 additions & 8 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,10 +579,7 @@ func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer,
snapshot: seqNum,
iterOpts: IterOptions{
// TODO(sumeer): replace with a parameter provided by the caller.
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-get",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
Category: categoryGet,
logger: d.opts.Logger,
snapshotForHideObsoletePoints: seqNum,
},
Expand Down Expand Up @@ -1262,7 +1259,7 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
// iteration is invalid in those cases.
func (d *DB) ScanInternal(
ctx context.Context,
categoryAndQoS sstable.CategoryAndQoS,
category sstable.Category,
lower, upper []byte,
visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
visitRangeDel func(start, end []byte, seqNum SeqNum) error,
Expand All @@ -1271,7 +1268,7 @@ func (d *DB) ScanInternal(
visitExternalFile func(sst *ExternalFile) error,
) error {
scanInternalOpts := &scanInternalOptions{
CategoryAndQoS: categoryAndQoS,
category: category,
visitPointKey: visitPointKey,
visitRangeDel: visitRangeDel,
visitRangeKey: visitRangeKey,
Expand Down Expand Up @@ -1369,7 +1366,7 @@ func finishInitializingInternalIter(
}
i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)

if err := i.constructPointIter(i.opts.CategoryAndQoS, memtables, buf); err != nil {
if err := i.constructPointIter(i.opts.category, memtables, buf); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1411,7 +1408,7 @@ func (i *Iterator) constructPointIter(
if collector := i.tc.dbOpts.sstStatsCollector; collector != nil {
internalOpts.iterStatsAccumulator = collector.Accumulator(
uint64(uintptr(unsafe.Pointer(i))),
i.opts.CategoryAndQoS,
i.opts.Category,
)
}
if i.opts.RangeKeyMasking.Filter != nil {
Expand Down
6 changes: 1 addition & 5 deletions get_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/sstable"
)

func TestGetIter(t *testing.T) {
Expand Down Expand Up @@ -455,10 +454,7 @@ func TestGetIter(t *testing.T) {
get.version = v
get.snapshot = ikey.SeqNum() + 1
get.iterOpts = IterOptions{
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-get",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
Category: categoryGet,
logger: testLogger{t},
snapshotForHideObsoletePoints: get.snapshot,
}
Expand Down
14 changes: 4 additions & 10 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1752,11 +1752,8 @@ func (d *DB) excise(
}
var err error
iters, err = d.newIters(ctx, m, &IterOptions{
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
layer: manifest.Level(level),
Category: categoryIngest,
layer: manifest.Level(level),
}, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
itersLoaded = true
return err
Expand Down Expand Up @@ -2154,11 +2151,8 @@ func (d *DB) ingestApply(
comparer: d.opts.Comparer,
newIters: d.newIters,
opts: IterOptions{
logger: d.opts.Logger,
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
logger: d.opts.Logger,
Category: categoryIngest,
},
v: current,
}
Expand Down
13 changes: 5 additions & 8 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ func testIngestSharedImpl(
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var sharedSSTs []SharedSSTMeta
err = from.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, startKey, endKey,
err = from.ScanInternal(context.TODO(), sstable.CategoryUnknown, startKey, endKey,
func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1634,7 +1634,7 @@ func TestConcurrentExcise(t *testing.T) {
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var sharedSSTs []SharedSSTMeta
err = from.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, startKey, endKey,
err = from.ScanInternal(context.TODO(), sstable.CategoryUnknown, startKey, endKey,
func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
Expand Down Expand Up @@ -2071,7 +2071,7 @@ func TestIngestExternal(t *testing.T) {
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)

var externalFiles []ExternalFile
err = from.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, startKey, endKey,
err = from.ScanInternal(context.TODO(), sstable.CategoryUnknown, startKey, endKey,
func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
val, _, err := value.Value(nil)
require.NoError(t, err)
Expand Down Expand Up @@ -2356,11 +2356,8 @@ func TestIngestTargetLevel(t *testing.T) {
comparer: d.opts.Comparer,
newIters: d.newIters,
opts: IterOptions{
logger: d.opts.Logger,
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
},
logger: d.opts.Logger,
Category: categoryIngest,
},
v: d.mu.versions.currentVersion(),
}
Expand Down
2 changes: 1 addition & 1 deletion level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (l *levelIter) init(
l.tableOpts.PointKeyFilters = l.filtersBuf[:0:1]
}
l.tableOpts.UseL6Filters = opts.UseL6Filters
l.tableOpts.CategoryAndQoS = opts.CategoryAndQoS
l.tableOpts.Category = opts.Category
l.tableOpts.layer = l.layer
l.tableOpts.snapshotForHideObsoletePoints = opts.snapshotForHideObsoletePoints
l.comparer = comparer
Expand Down
4 changes: 2 additions & 2 deletions metamorphic/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -1914,7 +1914,7 @@ func (r *replicateOp) runSharedReplicate(
) {
var sharedSSTs []pebble.SharedSSTMeta
var err error
err = source.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, r.start, r.end,
err = source.ScanInternal(context.TODO(), sstable.CategoryUnknown, r.start, r.end,
func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
val, _, err := value.Value(nil)
if err != nil {
Expand Down Expand Up @@ -1977,7 +1977,7 @@ func (r *replicateOp) runExternalReplicate(
) {
var externalSSTs []pebble.ExternalFile
var err error
err = source.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, r.start, r.end,
err = source.ScanInternal(context.TODO(), sstable.CategoryUnknown, r.start, r.end,
func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
val, _, err := value.Value(nil)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func (m *LevelMetrics) WriteAmp() float64 {
return float64(m.BytesFlushed+m.BytesCompacted) / float64(m.BytesIn)
}

var categoryCompaction = sstable.RegisterCategory("pebble-compaction", sstable.NonLatencySensitiveQoSLevel)
var categoryIngest = sstable.RegisterCategory("pebble-ingest", sstable.LatencySensitiveQoSLevel)
var categoryGet = sstable.RegisterCategory("pebble-get", sstable.LatencySensitiveQoSLevel)

// Metrics holds metrics for various subsystems of the DB such as the Cache,
// Compactions, WAL, and per-Level metrics.
//
Expand Down
22 changes: 12 additions & 10 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ func exampleMetrics() Metrics {
return m
}

func init() {
// Register some categories for the purposes of the test.
sstable.RegisterCategory("a", sstable.NonLatencySensitiveQoSLevel)
sstable.RegisterCategory("b", sstable.LatencySensitiveQoSLevel)
sstable.RegisterCategory("c", sstable.NonLatencySensitiveQoSLevel)
}

func TestMetrics(t *testing.T) {
if runtime.GOARCH == "386" {
t.Skip("skipped on 32-bit due to slightly varied output")
Expand Down Expand Up @@ -306,18 +313,13 @@ func TestMetrics(t *testing.T) {
return err.Error()
}
}
var categoryAndQoS sstable.CategoryAndQoS
category := sstable.CategoryUnknown
if td.HasArg("category") {
var s string
td.ScanArgs(t, "category", &s)
categoryAndQoS.Category = sstable.Category(s)
}
if td.HasArg("qos") {
var qos string
td.ScanArgs(t, "qos", &qos)
categoryAndQoS.QoSLevel = sstable.StringToQoSForTesting(qos)
category = sstable.StringToCategoryForTesting(s)
}
iter, _ := d.NewIter(&IterOptions{CategoryAndQoS: categoryAndQoS})
iter, _ := d.NewIter(&IterOptions{Category: category})
// Some iterators (eg. levelIter) do not instantiate the underlying
// iterator until the first positioning call. Position the iterator
// so that levelIters will have loaded an sstable.
Expand All @@ -342,7 +344,7 @@ func TestMetrics(t *testing.T) {
m.TableCache = cache.Metrics{}
m.BlockCache = cache.Metrics{}
// Empirically, the unknown stats are also non-deterministic.
if len(m.CategoryStats) > 0 && m.CategoryStats[0].Category == "_unknown" {
if len(m.CategoryStats) > 0 && m.CategoryStats[0].Category == sstable.CategoryUnknown {
m.CategoryStats[0].CategoryStats = sstable.CategoryStats{}
}
}
Expand All @@ -352,7 +354,7 @@ func TestMetrics(t *testing.T) {
fmt.Fprintf(&buf, "Iter category stats:\n")
for _, stats := range m.CategoryStats {
fmt.Fprintf(&buf, "%20s, %11s: %+v\n", stats.Category,
redact.StringWithoutMarkers(stats.QoSLevel), stats.CategoryStats)
redact.StringWithoutMarkers(stats.Category.QoSLevel()), stats.CategoryStats)
}
}
return buf.String()
Expand Down
7 changes: 4 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ type IterOptions struct {
// existing is not low or if we just expect a one-time Seek (where loading the
// data block directly is better).
UseL6Filters bool
// CategoryAndQoS is used for categorized iterator stats. This should not be
// Category is used for categorized iterator stats. This should not be
// changed by calling SetOptions.
sstable.CategoryAndQoS
Category sstable.Category

DebugRangeKeyStack bool

Expand Down Expand Up @@ -261,9 +261,10 @@ func (o *IterOptions) SpanIterOptions() keyspan.SpanIterOptions {
// scanInternalOptions is similar to IterOptions, meant for use with
// scanInternalIterator.
type scanInternalOptions struct {
sstable.CategoryAndQoS
IterOptions

category sstable.Category

visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error
visitRangeDel func(start, end []byte, seqNum SeqNum) error
visitRangeKey func(start, end []byte, keys []rangekey.Key) error
Expand Down
4 changes: 2 additions & 2 deletions scan_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ func (opts *scanInternalOptions) skipLevelForOpts() int {

// constructPointIter constructs a merging iterator and sets i.iter to it.
func (i *scanInternalIterator) constructPointIter(
categoryAndQoS sstable.CategoryAndQoS, memtables flushableList, buf *iterAlloc,
category sstable.Category, memtables flushableList, buf *iterAlloc,
) error {
// Merging levels and levels from iterAlloc.
mlevels := buf.mlevels[:0]
Expand Down Expand Up @@ -897,7 +897,7 @@ func (i *scanInternalIterator) constructPointIter(
levels = levels[:numLevelIters]
rangeDelLevels = rangeDelLevels[:numLevelIters]
i.opts.IterOptions.snapshotForHideObsoletePoints = i.seqNum
i.opts.IterOptions.CategoryAndQoS = categoryAndQoS
i.opts.IterOptions.Category = category
addLevelIterForFiles := func(files manifest.LevelIterator, level manifest.Layer) {
li := &levels[levelsIndex]
rli := &rangeDelLevels[levelsIndex]
Expand Down
4 changes: 2 additions & 2 deletions scan_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestScanInternal(t *testing.T) {
type scanInternalReader interface {
ScanInternal(
ctx context.Context,
categoryAndQoS sstable.CategoryAndQoS,
category sstable.Category,
lower, upper []byte,
visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
visitRangeDel func(start, end []byte, seqNum base.SeqNum) error,
Expand Down Expand Up @@ -561,7 +561,7 @@ func TestScanInternal(t *testing.T) {
}
}
}
err := reader.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, lower, upper,
err := reader.ScanInternal(context.TODO(), sstable.CategoryUnknown, lower, upper,
func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
v := value.InPlaceValue()
fmt.Fprintf(&b, "%s (%s)\n", key, v)
Expand Down
8 changes: 4 additions & 4 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *Snapshot) NewIterWithContext(ctx context.Context, o *IterOptions) (*Ite
// point keys deleted by range dels and keys masked by range keys.
func (s *Snapshot) ScanInternal(
ctx context.Context,
categoryAndQoS sstable.CategoryAndQoS,
category sstable.Category,
lower, upper []byte,
visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
visitRangeDel func(start, end []byte, seqNum base.SeqNum) error,
Expand All @@ -87,7 +87,7 @@ func (s *Snapshot) ScanInternal(
panic(ErrClosed)
}
scanInternalOpts := &scanInternalOptions{
CategoryAndQoS: categoryAndQoS,
category: category,
visitPointKey: visitPointKey,
visitRangeDel: visitRangeDel,
visitRangeKey: visitRangeKey,
Expand Down Expand Up @@ -473,7 +473,7 @@ func (es *EventuallyFileOnlySnapshot) NewIterWithContext(
// point keys deleted by range dels and keys masked by range keys.
func (es *EventuallyFileOnlySnapshot) ScanInternal(
ctx context.Context,
categoryAndQoS sstable.CategoryAndQoS,
category sstable.Category,
lower, upper []byte,
visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
visitRangeDel func(start, end []byte, seqNum base.SeqNum) error,
Expand All @@ -486,7 +486,7 @@ func (es *EventuallyFileOnlySnapshot) ScanInternal(
}
var sOpts snapshotIterOpts
opts := &scanInternalOptions{
CategoryAndQoS: categoryAndQoS,
category: category,
IterOptions: IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: lower,
Expand Down
Loading

0 comments on commit 71bb6ba

Please sign in to comment.