-
Notifications
You must be signed in to change notification settings - Fork 452
/
flushable.go
473 lines (434 loc) · 15.8 KB
/
flushable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package pebble
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
"github.com/cockroachdb/pebble/internal/manifest"
)
// flushable defines the interface for immutable memtables.
type flushable interface {
newIter(o *IterOptions) internalIterator
newFlushIter(o *IterOptions) internalIterator
newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
containsRangeKeys() bool
// inuseBytes returns the number of inuse bytes by the flushable.
inuseBytes() uint64
// totalBytes returns the total number of bytes allocated by the flushable.
totalBytes() uint64
// readyForFlush returns true when the flushable is ready for flushing. See
// memTable.readyForFlush for one implementation which needs to check whether
// there are any outstanding write references.
readyForFlush() bool
// computePossibleOverlaps determines whether the flushable's keys overlap
// with the bounds of any of the provided bounded items. If an item overlaps
// or might overlap but it's not possible to determine overlap cheaply,
// computePossibleOverlaps invokes the provided function with the object
// that might overlap. computePossibleOverlaps must not perform any I/O and
// implementations should invoke the provided function for items that would
// require I/O to determine overlap.
computePossibleOverlaps(overlaps func(bounded) shouldContinue, bounded ...bounded)
}
type shouldContinue bool
const (
continueIteration shouldContinue = true
stopIteration = false
)
type bounded interface {
UserKeyBounds() base.UserKeyBounds
}
var _ bounded = (*fileMetadata)(nil)
var _ bounded = KeyRange{}
func sliceAsBounded[B bounded](s []B) []bounded {
ret := make([]bounded, len(s))
for i := 0; i < len(s); i++ {
ret[i] = s[i]
}
return ret
}
// flushableEntry wraps a flushable and adds additional metadata and
// functionality that is common to all flushables.
type flushableEntry struct {
flushable
// Channel which is closed when the flushable has been flushed.
flushed chan struct{}
// flushForced indicates whether a flush was forced on this memtable (either
// manual, or due to ingestion). Protected by DB.mu.
flushForced bool
// delayedFlushForcedAt indicates whether a timer has been set to force a
// flush on this memtable at some point in the future. Protected by DB.mu.
// Holds the timestamp of when the flush will be issued.
delayedFlushForcedAt time.Time
// logNum corresponds to the WAL that contains the records present in the
// receiver.
logNum base.DiskFileNum
// logSize is the size in bytes of the associated WAL. Protected by DB.mu.
logSize uint64
// The current logSeqNum at the time the memtable was created. This is
// guaranteed to be less than or equal to any seqnum stored in the memtable.
logSeqNum base.SeqNum
// readerRefs tracks the read references on the flushable. The two sources of
// reader references are DB.mu.mem.queue and readState.memtables. The memory
// reserved by the flushable in the cache is released when the reader refs
// drop to zero. If the flushable is referencing sstables, then the file
// refount is also decreased once the reader refs drops to 0. If the
// flushable is a memTable, when the reader refs drops to zero, the writer
// refs will already be zero because the memtable will have been flushed and
// that only occurs once the writer refs drops to zero.
readerRefs atomic.Int32
// Closure to invoke to release memory accounting.
releaseMemAccounting func()
// unrefFiles, if not nil, should be invoked to decrease the ref count of
// files which are backing the flushable.
unrefFiles func() []*fileBacking
// deleteFnLocked should be called if the caller is holding DB.mu.
deleteFnLocked func(obsolete []*fileBacking)
// deleteFn should be called if the caller is not holding DB.mu.
deleteFn func(obsolete []*fileBacking)
}
func (e *flushableEntry) readerRef() {
switch v := e.readerRefs.Add(1); {
case v <= 1:
panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
}
}
// db.mu must not be held when this is called.
func (e *flushableEntry) readerUnref(deleteFiles bool) {
e.readerUnrefHelper(deleteFiles, e.deleteFn)
}
// db.mu must be held when this is called.
func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) {
e.readerUnrefHelper(deleteFiles, e.deleteFnLocked)
}
func (e *flushableEntry) readerUnrefHelper(
deleteFiles bool, deleteFn func(obsolete []*fileBacking),
) {
switch v := e.readerRefs.Add(-1); {
case v < 0:
panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
case v == 0:
if e.releaseMemAccounting == nil {
panic("pebble: memtable reservation already released")
}
e.releaseMemAccounting()
e.releaseMemAccounting = nil
if e.unrefFiles != nil {
obsolete := e.unrefFiles()
e.unrefFiles = nil
if deleteFiles {
deleteFn(obsolete)
}
}
}
}
type flushableList []*flushableEntry
// ingestedFlushable is the implementation of the flushable interface for the
// ingesting sstables which are added to the flushable list.
type ingestedFlushable struct {
// files are non-overlapping and ordered (according to their bounds).
files []physicalMeta
comparer *Comparer
newIters tableNewIters
newRangeKeyIters keyspanimpl.TableNewSpanIter
// Since the level slice is immutable, we construct and set it once. It
// should be safe to read from slice in future reads.
slice manifest.LevelSlice
// hasRangeKeys is set on ingestedFlushable construction.
hasRangeKeys bool
// exciseSpan is populated if an excise operation should be performed during
// flush.
exciseSpan KeyRange
exciseSeqNum base.SeqNum
}
func newIngestedFlushable(
files []*fileMetadata,
comparer *Comparer,
newIters tableNewIters,
newRangeKeyIters keyspanimpl.TableNewSpanIter,
exciseSpan KeyRange,
seqNum base.SeqNum,
) *ingestedFlushable {
if invariants.Enabled {
for i := 1; i < len(files); i++ {
prev := files[i-1].UserKeyBounds()
this := files[i].UserKeyBounds()
if prev.End.IsUpperBoundFor(comparer.Compare, this.Start) {
panic(errors.AssertionFailedf("ingested flushable files overlap: %s %s", prev, this))
}
}
}
var physicalFiles []physicalMeta
var hasRangeKeys bool
for _, f := range files {
if f.HasRangeKeys {
hasRangeKeys = true
}
physicalFiles = append(physicalFiles, f.PhysicalMeta())
}
ret := &ingestedFlushable{
files: physicalFiles,
comparer: comparer,
newIters: newIters,
newRangeKeyIters: newRangeKeyIters,
// slice is immutable and can be set once and used many times.
slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files),
hasRangeKeys: hasRangeKeys,
exciseSpan: exciseSpan,
exciseSeqNum: seqNum,
}
return ret
}
// TODO(sumeer): ingestedFlushable iters also need to plumb context for
// tracing.
// newIter is part of the flushable interface.
func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
var opts IterOptions
if o != nil {
opts = *o
}
return newLevelIter(
context.Background(), opts, s.comparer, s.newIters, s.slice.Iter(), manifest.FlushableIngestsLayer(),
internalIterOpts{},
)
}
// newFlushIter is part of the flushable interface.
func (s *ingestedFlushable) newFlushIter(*IterOptions) internalIterator {
// newFlushIter is only used for writing memtables to disk as sstables.
// Since ingested sstables are already present on disk, they don't need to
// make use of a flush iter.
panic("pebble: not implemented")
}
func (s *ingestedFlushable) constructRangeDelIter(
ctx context.Context, file *manifest.FileMetadata, _ keyspan.SpanIterOptions,
) (keyspan.FragmentIterator, error) {
iters, err := s.newIters(ctx, file, nil, internalIterOpts{}, iterRangeDeletions)
if err != nil {
return nil, err
}
return iters.RangeDeletion(), nil
}
// newRangeDelIter is part of the flushable interface.
// TODO(bananabrick): Using a level iter instead of a keyspan level iter to
// surface range deletes is more efficient.
//
// TODO(sumeer): *IterOptions are being ignored, so the index block load for
// the point iterator in constructRangeDeIter is not tracked.
func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
liter := keyspanimpl.NewLevelIter(
context.TODO(),
keyspan.SpanIterOptions{}, s.comparer.Compare,
s.constructRangeDelIter, s.slice.Iter(), manifest.FlushableIngestsLayer(),
manifest.KeyTypePoint,
)
if !s.exciseSpan.Valid() {
return liter
}
// We have an excise span to weave into the rangedel iterators.
//
// TODO(bilal): should this be pooled?
miter := &keyspanimpl.MergingIter{}
rdel := keyspan.Span{
Start: s.exciseSpan.Start,
End: s.exciseSpan.End,
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeDelete)}},
}
rdelIter := keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rdel})
miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rdelIter)
return miter
}
// newRangeKeyIter is part of the flushable interface.
func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
var rkeydelIter keyspan.FragmentIterator
if s.exciseSpan.Valid() {
// We have an excise span to weave into the rangekey iterators.
rkeydel := keyspan.Span{
Start: s.exciseSpan.Start,
End: s.exciseSpan.End,
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.exciseSeqNum, base.InternalKeyKindRangeKeyDelete)}},
}
rkeydelIter = keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rkeydel})
}
if !s.hasRangeKeys {
if rkeydelIter == nil {
// NB: we have to return the nil literal as opposed to the nil
// value of rkeydelIter, otherwise callers of this function will
// have the return value fail == nil checks.
return nil
}
return rkeydelIter
}
liter := keyspanimpl.NewLevelIter(
context.TODO(),
keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters,
s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypeRange,
)
if rkeydelIter == nil {
return liter
}
// TODO(bilal): should this be pooled?
miter := &keyspanimpl.MergingIter{}
miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rkeydelIter)
return miter
}
// containsRangeKeys is part of the flushable interface.
func (s *ingestedFlushable) containsRangeKeys() bool {
return s.hasRangeKeys || s.exciseSpan.Valid()
}
// inuseBytes is part of the flushable interface.
func (s *ingestedFlushable) inuseBytes() uint64 {
// inuseBytes is only used when memtables are flushed to disk as sstables.
panic("pebble: not implemented")
}
// totalBytes is part of the flushable interface.
func (s *ingestedFlushable) totalBytes() uint64 {
// We don't allocate additional bytes for the ingestedFlushable.
return 0
}
// readyForFlush is part of the flushable interface.
func (s *ingestedFlushable) readyForFlush() bool {
// ingestedFlushable should always be ready to flush. However, note that
// memtables before the ingested sstables in the memtable queue must be
// flushed before an ingestedFlushable can be flushed. This is because the
// ingested sstables need an updated view of the Version to
// determine where to place the files in the lsm.
return true
}
// computePossibleOverlaps is part of the flushable interface.
func (s *ingestedFlushable) computePossibleOverlaps(
fn func(bounded) shouldContinue, bounded ...bounded,
) {
for _, b := range bounded {
if s.anyFileOverlaps(b.UserKeyBounds()) {
// Some file overlaps in key boundaries. The file doesn't necessarily
// contain any keys within the key range, but we would need to perform I/O
// to know for sure. The flushable interface dictates that we're not
// permitted to perform I/O here, so err towards assuming overlap.
if !fn(b) {
return
}
}
}
}
// anyFileBoundsOverlap returns true if there is at least a file in s.files with
// bounds that overlap the given bounds.
func (s *ingestedFlushable) anyFileOverlaps(bounds base.UserKeyBounds) bool {
// Note that s.files are non-overlapping and sorted.
for _, f := range s.files {
fileBounds := f.UserKeyBounds()
if !fileBounds.End.IsUpperBoundFor(s.comparer.Compare, bounds.Start) {
// The file ends before the bounds start. Go to the next file.
continue
}
if !bounds.End.IsUpperBoundFor(s.comparer.Compare, fileBounds.Start) {
// The file starts after the bounds end. There is no overlap, and
// further files will not overlap either (the files are sorted).
break
}
// There is overlap. Note that UserKeyBounds.Overlaps() performs exactly the
// checks above.
return true
}
if s.exciseSpan.Valid() {
uk := s.exciseSpan.UserKeyBounds()
return uk.Overlaps(s.comparer.Compare, &bounds)
}
return false
}
// computePossibleOverlapsGenericImpl is an implementation of the flushable
// interface's computePossibleOverlaps function for flushable implementations
// with only in-memory state that do not have special requirements and should
// read through the ordinary flushable iterators.
//
// This function must only be used with implementations that are infallible (eg,
// memtable iterators) and will panic if an error is encountered.
func computePossibleOverlapsGenericImpl[F flushable](
f F, cmp Compare, fn func(bounded) shouldContinue, bounded []bounded,
) {
iter := f.newIter(nil)
rangeDelIter := f.newRangeDelIter(nil)
rangeKeyIter := f.newRangeKeyIter(nil)
for _, b := range bounded {
overlap, err := determineOverlapAllIters(cmp, b.UserKeyBounds(), iter, rangeDelIter, rangeKeyIter)
if invariants.Enabled && err != nil {
panic(errors.AssertionFailedf("expected iterator to be infallible: %v", err))
}
if overlap {
if !fn(b) {
break
}
}
}
if iter != nil {
if err := iter.Close(); err != nil {
// This implementation must be used in circumstances where
// reading through the iterator is infallible.
panic(err)
}
}
if rangeDelIter != nil {
rangeDelIter.Close()
}
if rangeKeyIter != nil {
rangeKeyIter.Close()
}
}
// determineOverlapAllIters checks for overlap in a point iterator, range
// deletion iterator and range key iterator.
func determineOverlapAllIters(
cmp base.Compare,
bounds base.UserKeyBounds,
pointIter base.InternalIterator,
rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
) (bool, error) {
if pointIter != nil {
if pointOverlap, err := determineOverlapPointIterator(cmp, bounds, pointIter); pointOverlap || err != nil {
return pointOverlap, err
}
}
if rangeDelIter != nil {
if rangeDelOverlap, err := determineOverlapKeyspanIterator(cmp, bounds, rangeDelIter); rangeDelOverlap || err != nil {
return rangeDelOverlap, err
}
}
if rangeKeyIter != nil {
return determineOverlapKeyspanIterator(cmp, bounds, rangeKeyIter)
}
return false, nil
}
func determineOverlapPointIterator(
cmp base.Compare, bounds base.UserKeyBounds, iter internalIterator,
) (bool, error) {
kv := iter.SeekGE(bounds.Start, base.SeekGEFlagsNone)
if kv == nil {
return false, iter.Error()
}
return bounds.End.IsUpperBoundForInternalKey(cmp, kv.K), nil
}
func determineOverlapKeyspanIterator(
cmp base.Compare, bounds base.UserKeyBounds, iter keyspan.FragmentIterator,
) (bool, error) {
// NB: The spans surfaced by the fragment iterator are non-overlapping.
span, err := iter.SeekGE(bounds.Start)
if err != nil {
return false, err
}
for ; span != nil; span, err = iter.Next() {
if !bounds.End.IsUpperBoundFor(cmp, span.Start) {
// The span starts after our bounds.
return false, nil
}
if !span.Empty() {
return true, nil
}
}
return false, err
}