-
Notifications
You must be signed in to change notification settings - Fork 452
/
iterator.go
3123 lines (2937 loc) · 120 KB
/
iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2011 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package pebble
import (
"bytes"
"context"
"io"
"math/rand/v2"
"sync"
"unsafe"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/bytealloc"
"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rangekeystack"
"github.com/cockroachdb/pebble/internal/treeprinter"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/redact"
)
// iterPos describes the state of the internal iterator, in terms of whether it
// is at the position returned to the user (cur), one ahead of the position
// returned (next for forward iteration and prev for reverse iteration). The cur
// position is split into two states, for forward and reverse iteration, since
// we need to differentiate for switching directions.
//
// There is subtlety in what is considered the current position of the Iterator.
// The internal iterator exposes a sequence of internal keys. There is not
// always a single internalIterator position corresponding to the position
// returned to the user. Consider the example:
//
// a.MERGE.9 a.MERGE.8 a.MERGE.7 a.SET.6 b.DELETE.9 b.DELETE.5 b.SET.4
// \ /
// \ Iterator.Key() = 'a' /
//
// The Iterator exposes one valid position at user key 'a' and the two exhausted
// positions at the beginning and end of iteration. The underlying
// internalIterator contains 7 valid positions and 2 exhausted positions.
//
// Iterator positioning methods must set iterPos to iterPosCur{Foward,Backward}
// iff the user key at the current internalIterator position equals the
// Iterator.Key returned to the user. This guarantees that a call to nextUserKey
// or prevUserKey will advance to the next or previous iterator position.
// iterPosCur{Forward,Backward} does not make any guarantee about the internal
// iterator position among internal keys with matching user keys, and it will
// vary subtly depending on the particular key kinds encountered. In the above
// example, the iterator returning 'a' may set iterPosCurForward if the internal
// iterator is positioned at any of a.MERGE.9, a.MERGE.8, a.MERGE.7 or a.SET.6.
//
// When setting iterPos to iterPosNext or iterPosPrev, the internal iterator
// must be advanced to the first internalIterator position at a user key greater
// (iterPosNext) or less (iterPosPrev) than the key returned to the user. An
// internalIterator position that's !Valid() must also be considered greater or
// less—depending on the direction of iteration—than the last valid Iterator
// position.
type iterPos int8
const (
iterPosCurForward iterPos = 0
iterPosNext iterPos = 1
iterPosPrev iterPos = -1
iterPosCurReverse iterPos = -2
// For limited iteration. When the iterator is at iterPosCurForwardPaused
// - Next*() call should behave as if the internal iterator is already
// at next (akin to iterPosNext).
// - Prev*() call should behave as if the internal iterator is at the
// current key (akin to iterPosCurForward).
//
// Similar semantics apply to CurReversePaused.
iterPosCurForwardPaused iterPos = 2
iterPosCurReversePaused iterPos = -3
)
// Approximate gap in bytes between samples of data read during iteration.
// This is multiplied with a default ReadSamplingMultiplier of 1 << 4 to yield
// 1 << 20 (1MB). The 1MB factor comes from:
// https://github.com/cockroachdb/pebble/issues/29#issuecomment-494477985
const readBytesPeriod uint64 = 1 << 16
var errReversePrefixIteration = errors.New("pebble: unsupported reverse prefix iteration")
// IteratorMetrics holds per-iterator metrics. These do not change over the
// lifetime of the iterator.
type IteratorMetrics struct {
// The read amplification experienced by this iterator. This is the sum of
// the memtables, the L0 sublevels and the non-empty Ln levels. Higher read
// amplification generally results in slower reads, though allowing higher
// read amplification can also result in faster writes.
ReadAmp int
}
// IteratorStatsKind describes the two kind of iterator stats.
type IteratorStatsKind int8
const (
// InterfaceCall represents calls to Iterator.
InterfaceCall IteratorStatsKind = iota
// InternalIterCall represents calls by Iterator to its internalIterator.
InternalIterCall
// NumStatsKind is the number of kinds, and is used for array sizing.
NumStatsKind
)
// IteratorStats contains iteration stats.
type IteratorStats struct {
// ForwardSeekCount includes SeekGE, SeekPrefixGE, First.
ForwardSeekCount [NumStatsKind]int
// ReverseSeek includes SeekLT, Last.
ReverseSeekCount [NumStatsKind]int
// ForwardStepCount includes Next.
ForwardStepCount [NumStatsKind]int
// ReverseStepCount includes Prev.
ReverseStepCount [NumStatsKind]int
InternalStats InternalIteratorStats
RangeKeyStats RangeKeyIteratorStats
}
var _ redact.SafeFormatter = &IteratorStats{}
// InternalIteratorStats contains miscellaneous stats produced by internal
// iterators.
type InternalIteratorStats = base.InternalIteratorStats
// RangeKeyIteratorStats contains miscellaneous stats about range keys
// encountered by the iterator.
type RangeKeyIteratorStats struct {
// Count records the number of range keys encountered during
// iteration. Range keys may be counted multiple times if the iterator
// leaves a range key's bounds and then returns.
Count int
// ContainedPoints records the number of point keys encountered within the
// bounds of a range key. Note that this includes point keys with suffixes
// that sort both above and below the covering range key's suffix.
ContainedPoints int
// SkippedPoints records the count of the subset of ContainedPoints point
// keys that were skipped during iteration due to range-key masking. It does
// not include point keys that were never loaded because a
// RangeKeyMasking.Filter excluded the entire containing block.
SkippedPoints int
}
// Merge adds all of the argument's statistics to the receiver. It may be used
// to accumulate stats across multiple iterators.
func (s *RangeKeyIteratorStats) Merge(o RangeKeyIteratorStats) {
s.Count += o.Count
s.ContainedPoints += o.ContainedPoints
s.SkippedPoints += o.SkippedPoints
}
func (s *RangeKeyIteratorStats) String() string {
return redact.StringWithoutMarkers(s)
}
// SafeFormat implements the redact.SafeFormatter interface.
func (s *RangeKeyIteratorStats) SafeFormat(p redact.SafePrinter, verb rune) {
p.Printf("range keys: %s, contained points: %s (%s skipped)",
humanize.Count.Uint64(uint64(s.Count)),
humanize.Count.Uint64(uint64(s.ContainedPoints)),
humanize.Count.Uint64(uint64(s.SkippedPoints)))
}
// LazyValue is a lazy value. See the long comment in base.LazyValue.
type LazyValue = base.LazyValue
// Iterator iterates over a DB's key/value pairs in key order.
//
// An iterator must be closed after use, but it is not necessary to read an
// iterator until exhaustion.
//
// An iterator is not goroutine-safe, but it is safe to use multiple iterators
// concurrently, with each in a dedicated goroutine.
//
// It is also safe to use an iterator concurrently with modifying its
// underlying DB, if that DB permits modification. However, the resultant
// key/value pairs are not guaranteed to be a consistent snapshot of that DB
// at a particular point in time.
//
// If an iterator encounters an error during any operation, it is stored by
// the Iterator and surfaced through the Error method. All absolute
// positioning methods (eg, SeekLT, SeekGT, First, Last, etc) reset any
// accumulated error before positioning. All relative positioning methods (eg,
// Next, Prev) return without advancing if the iterator has an accumulated
// error.
type Iterator struct {
// The context is stored here since (a) Iterators are expected to be
// short-lived (since they pin memtables and sstables), (b) plumbing a
// context into every method is very painful, (c) they do not (yet) respect
// context cancellation and are only used for tracing.
ctx context.Context
opts IterOptions
merge Merge
comparer base.Comparer
iter internalIterator
pointIter topLevelIterator
// Either readState or version is set, but not both.
readState *readState
version *version
// rangeKey holds iteration state specific to iteration over range keys.
// The range key field may be nil if the Iterator has never been configured
// to iterate over range keys. Its non-nilness cannot be used to determine
// if the Iterator is currently iterating over range keys: For that, consult
// the IterOptions using opts.rangeKeys(). If non-nil, its rangeKeyIter
// field is guaranteed to be non-nil too.
rangeKey *iteratorRangeKeyState
// rangeKeyMasking holds state for range-key masking of point keys.
rangeKeyMasking rangeKeyMasking
err error
// When iterValidityState=IterValid, key represents the current key, which
// is backed by keyBuf.
key []byte
keyBuf []byte
value LazyValue
// For use in LazyValue.Clone.
valueBuf []byte
fetcher base.LazyFetcher
// For use in LazyValue.Value.
lazyValueBuf []byte
valueCloser io.Closer
// boundsBuf holds two buffers used to store the lower and upper bounds.
// Whenever the Iterator's bounds change, the new bounds are copied into
// boundsBuf[boundsBufIdx]. The two bounds share a slice to reduce
// allocations. opts.LowerBound and opts.UpperBound point into this slice.
boundsBuf [2][]byte
boundsBufIdx int
// iterKV reflects the latest position of iter, except when SetBounds is
// called. In that case, it is explicitly set to nil.
iterKV *base.InternalKV
alloc *iterAlloc
getIterAlloc *getIterAlloc
prefixOrFullSeekKey []byte
readSampling readSampling
stats IteratorStats
externalReaders [][]*sstable.Reader
// Following fields used when constructing an iterator stack, eg, in Clone
// and SetOptions or when re-fragmenting a batch's range keys/range dels.
// Non-nil if this Iterator includes a Batch.
batch *Batch
tc *tableCacheContainer
newIters tableNewIters
newIterRangeKey keyspanimpl.TableNewSpanIter
lazyCombinedIter lazyCombinedIter
seqNum base.SeqNum
// batchSeqNum is used by Iterators over indexed batches to detect when the
// underlying batch has been mutated. The batch beneath an indexed batch may
// be mutated while the Iterator is open, but new keys are not surfaced
// until the next call to SetOptions.
batchSeqNum base.SeqNum
// batch{PointIter,RangeDelIter,RangeKeyIter} are used when the Iterator is
// configured to read through an indexed batch. If a batch is set, these
// iterators will be included within the iterator stack regardless of
// whether the batch currently contains any keys of their kind. These
// pointers are used during a call to SetOptions to refresh the Iterator's
// view of its indexed batch.
batchPointIter batchIter
batchRangeDelIter keyspan.Iter
batchRangeKeyIter keyspan.Iter
// merging is a pointer to this iterator's point merging iterator. It
// appears here because key visibility is handled by the merging iterator.
// During SetOptions on an iterator over an indexed batch, this field is
// used to update the merging iterator's batch snapshot.
merging *mergingIter
// Keeping the bools here after all the 8 byte aligned fields shrinks the
// sizeof this struct by 24 bytes.
// INVARIANT:
// iterValidityState==IterAtLimit <=>
// pos==iterPosCurForwardPaused || pos==iterPosCurReversePaused
iterValidityState IterValidityState
// Set to true by SetBounds, SetOptions. Causes the Iterator to appear
// exhausted externally, while preserving the correct iterValidityState for
// the iterator's internal state. Preserving the correct internal validity
// is used for SeekPrefixGE(..., trySeekUsingNext), and SeekGE/SeekLT
// optimizations after "no-op" calls to SetBounds and SetOptions.
requiresReposition bool
// The position of iter. When this is iterPos{Prev,Next} the iter has been
// moved past the current key-value, which can only happen if
// iterValidityState=IterValid, i.e., there is something to return to the
// client for the current position.
pos iterPos
// Relates to the prefixOrFullSeekKey field above.
hasPrefix bool
// Used for deriving the value of SeekPrefixGE(..., trySeekUsingNext),
// and SeekGE/SeekLT optimizations
lastPositioningOp lastPositioningOpKind
// Used for determining when it's safe to perform SeekGE optimizations that
// reuse the iterator state to avoid the cost of a full seek if the iterator
// is already positioned in the correct place. If the iterator's view of its
// indexed batch was just refreshed, some optimizations cannot be applied on
// the first seek after the refresh:
// - SeekGE has a no-op optimization that does not seek on the internal
// iterator at all if the iterator is already in the correct place.
// This optimization cannot be performed if the internal iterator was
// last positioned when the iterator had a different view of an
// underlying batch.
// - Seek[Prefix]GE set flags.TrySeekUsingNext()=true when the seek key is
// greater than the previous operation's seek key, under the expectation
// that the various internal iterators can use their current position to
// avoid a full expensive re-seek. This applies to the batchIter as well.
// However, if the view of the batch was just refreshed, the batchIter's
// position is not useful because it may already be beyond new keys less
// than the seek key. To prevent the use of this optimization in
// batchIter, Seek[Prefix]GE set flags.BatchJustRefreshed()=true if this
// bit is enabled.
batchJustRefreshed bool
// batchOnlyIter is set to true for Batch.NewBatchOnlyIter.
batchOnlyIter bool
// Used in some tests to disable the random disabling of seek optimizations.
forceEnableSeekOpt bool
// Set to true if NextPrefix is not currently permitted. Defaults to false
// in case an iterator never had any bounds.
nextPrefixNotPermittedByUpperBound bool
}
// cmp is a convenience shorthand for the i.comparer.Compare function.
func (i *Iterator) cmp(a, b []byte) int {
return i.comparer.Compare(a, b)
}
// equal is a convenience shorthand for the i.comparer.Equal function.
func (i *Iterator) equal(a, b []byte) bool {
return i.comparer.Equal(a, b)
}
// iteratorRangeKeyState holds an iterator's range key iteration state.
type iteratorRangeKeyState struct {
opts *IterOptions
cmp base.Compare
split base.Split
// rangeKeyIter holds the range key iterator stack that iterates over the
// merged spans across the entirety of the LSM.
rangeKeyIter keyspan.FragmentIterator
iiter keyspan.InterleavingIter
// stale is set to true when the range key state recorded here (in start,
// end and keys) may not be in sync with the current range key at the
// interleaving iterator's current position.
//
// When the interelaving iterator passes over a new span, it invokes the
// SpanChanged hook defined on the `rangeKeyMasking` type, which sets stale
// to true if the span is non-nil.
//
// The parent iterator may not be positioned over the interleaving
// iterator's current position (eg, i.iterPos = iterPos{Next,Prev}), so
// {keys,start,end} are only updated to the new range key during a call to
// Iterator.saveRangeKey.
stale bool
// updated is used to signal to the Iterator client whether the state of
// range keys has changed since the previous iterator position through the
// `RangeKeyChanged` method. It's set to true during an Iterator positioning
// operation that changes the state of the current range key. Each Iterator
// positioning operation sets it back to false before executing.
//
// TODO(jackson): The lifecycle of {stale,updated,prevPosHadRangeKey} is
// intricate and confusing. Try to refactor to reduce complexity.
updated bool
// prevPosHadRangeKey records whether the previous Iterator position had a
// range key (HasPointAndRage() = (_, true)). It's updated at the beginning
// of each new Iterator positioning operation. It's required by saveRangeKey to
// to set `updated` appropriately: Without this record of the previous iterator
// state, it's ambiguous whether an iterator only temporarily stepped onto a
// position without a range key.
prevPosHadRangeKey bool
// rangeKeyOnly is set to true if at the current iterator position there is
// no point key, only a range key start boundary.
rangeKeyOnly bool
// hasRangeKey is true when the current iterator position has a covering
// range key (eg, a range key with bounds [<lower>,<upper>) such that
// <lower> ≤ Key() < <upper>).
hasRangeKey bool
// start and end are the [start, end) boundaries of the current range keys.
start []byte
end []byte
rangeKeyBuffers
// iterConfig holds fields that are used for the construction of the
// iterator stack, but do not need to be directly accessed during iteration.
// This struct is bundled within the iteratorRangeKeyState struct to reduce
// allocations.
iterConfig rangekeystack.UserIteratorConfig
}
type rangeKeyBuffers struct {
// keys is sorted by Suffix ascending.
keys []RangeKeyData
// buf is used to save range-key data before moving the range-key iterator.
// Start and end boundaries, suffixes and values are all copied into buf.
buf bytealloc.A
// internal holds buffers used by the range key internal iterators.
internal rangekeystack.Buffers
}
func (b *rangeKeyBuffers) PrepareForReuse() {
const maxKeysReuse = 100
if len(b.keys) > maxKeysReuse {
b.keys = nil
}
// Avoid caching the key buf if it is overly large. The constant is
// fairly arbitrary.
if cap(b.buf) >= maxKeyBufCacheSize {
b.buf = nil
} else {
b.buf = b.buf[:0]
}
b.internal.PrepareForReuse()
}
func (i *iteratorRangeKeyState) init(cmp base.Compare, split base.Split, opts *IterOptions) {
i.cmp = cmp
i.split = split
i.opts = opts
}
var iterRangeKeyStateAllocPool = sync.Pool{
New: func() interface{} {
return &iteratorRangeKeyState{}
},
}
// isEphemeralPosition returns true iff the current iterator position is
// ephemeral, and won't be visited during subsequent relative positioning
// operations.
//
// The iterator position resulting from a SeekGE or SeekPrefixGE that lands on a
// straddling range key without a coincident point key is such a position.
func (i *Iterator) isEphemeralPosition() bool {
return i.opts.rangeKeys() && i.rangeKey != nil && i.rangeKey.rangeKeyOnly &&
!i.equal(i.rangeKey.start, i.key)
}
type lastPositioningOpKind int8
const (
unknownLastPositionOp lastPositioningOpKind = iota
seekPrefixGELastPositioningOp
seekGELastPositioningOp
seekLTLastPositioningOp
// internalNextOp is a special internal iterator positioning operation used
// by CanDeterministicallySingleDelete. It exists for enforcing requirements
// around calling CanDeterministicallySingleDelete at most once per external
// iterator position.
internalNextOp
)
// Limited iteration mode. Not for use with prefix iteration.
//
// SeekGE, SeekLT, Prev, Next have WithLimit variants, that pause the iterator
// at the limit in a best-effort manner. The client should behave correctly
// even if the limits are ignored. These limits are not "deep", in that they
// are not passed down to the underlying collection of internalIterators. This
// is because the limits are transient, and apply only until the next
// iteration call. They serve mainly as a way to bound the amount of work when
// two (or more) Iterators are being coordinated at a higher level.
//
// In limited iteration mode:
// - Avoid using Iterator.Valid if the last call was to a *WithLimit() method.
// The return value from the *WithLimit() method provides a more precise
// disposition.
// - The limit is exclusive for forward and inclusive for reverse.
//
//
// Limited iteration mode & range keys
//
// Limited iteration interacts with range-key iteration. When range key
// iteration is enabled, range keys are interleaved at their start boundaries.
// Limited iteration must ensure that if a range key exists within the limit,
// the iterator visits the range key.
//
// During forward limited iteration, this is trivial: An overlapping range key
// must have a start boundary less than the limit, and the range key's start
// boundary will be interleaved and found to be within the limit.
//
// During reverse limited iteration, the tail of the range key may fall within
// the limit. The range key must be surfaced even if the range key's start
// boundary is less than the limit, and if there are no point keys between the
// current iterator position and the limit. To provide this guarantee, reverse
// limited iteration ignores the limit as long as there is a range key
// overlapping the iteration position.
// IterValidityState captures the state of the Iterator.
type IterValidityState int8
const (
// IterExhausted represents an Iterator that is exhausted.
IterExhausted IterValidityState = iota
// IterValid represents an Iterator that is valid.
IterValid
// IterAtLimit represents an Iterator that has a non-exhausted
// internalIterator, but has reached a limit without any key for the
// caller.
IterAtLimit
)
// readSampling stores variables used to sample a read to trigger a read
// compaction
type readSampling struct {
bytesUntilReadSampling uint64
initialSamplePassed bool
pendingCompactions readCompactionQueue
// forceReadSampling is used for testing purposes to force a read sample on every
// call to Iterator.maybeSampleRead()
forceReadSampling bool
}
func (i *Iterator) findNextEntry(limit []byte) {
i.iterValidityState = IterExhausted
i.pos = iterPosCurForward
if i.opts.rangeKeys() && i.rangeKey != nil {
i.rangeKey.rangeKeyOnly = false
}
// Close the closer for the current value if one was open.
if i.closeValueCloser() != nil {
return
}
for i.iterKV != nil {
key := i.iterKV.K
// The topLevelIterator.StrictSeekPrefixGE contract requires that in
// prefix mode [i.hasPrefix=t], every point key returned by the internal
// iterator must have the current iteration prefix.
if invariants.Enabled && i.hasPrefix {
// Range keys are an exception to the contract and may return a different
// prefix. This case is explicitly handled in the switch statement below.
if key.Kind() != base.InternalKeyKindRangeKeySet {
if p := i.comparer.Split.Prefix(key.UserKey); !i.equal(i.prefixOrFullSeekKey, p) {
i.opts.logger.Fatalf("pebble: prefix violation: key %q does not have prefix %q\n", key.UserKey, i.prefixOrFullSeekKey)
}
}
}
// Compare with limit every time we start at a different user key.
// Note that given the best-effort contract of limit, we could avoid a
// comparison in the common case by doing this only after
// i.nextUserKey is called for the deletes below. However that makes
// the behavior non-deterministic (since the behavior will vary based
// on what has been compacted), which makes it hard to test with the
// metamorphic test. So we forego that performance optimization.
if limit != nil && i.cmp(limit, i.iterKV.K.UserKey) <= 0 {
i.iterValidityState = IterAtLimit
i.pos = iterPosCurForwardPaused
return
}
// If the user has configured a SkipPoint function, invoke it to see
// whether we should skip over the current user key.
if i.opts.SkipPoint != nil && key.Kind() != InternalKeyKindRangeKeySet && i.opts.SkipPoint(i.iterKV.K.UserKey) {
// NB: We could call nextUserKey, but in some cases the SkipPoint
// predicate function might be cheaper than nextUserKey's key copy
// and key comparison. This should be the case for MVCC suffix
// comparisons, for example. In the future, we could expand the
// SkipPoint interface to give the implementor more control over
// whether we skip over just the internal key, the user key, or even
// the key prefix.
i.stats.ForwardStepCount[InternalIterCall]++
i.iterKV = i.iter.Next()
continue
}
switch key.Kind() {
case InternalKeyKindRangeKeySet:
if i.hasPrefix {
if p := i.comparer.Split.Prefix(key.UserKey); !i.equal(i.prefixOrFullSeekKey, p) {
return
}
}
// Save the current key.
i.keyBuf = append(i.keyBuf[:0], key.UserKey...)
i.key = i.keyBuf
i.value = LazyValue{}
// There may also be a live point key at this userkey that we have
// not yet read. We need to find the next entry with this user key
// to find it. Save the range key so we don't lose it when we Next
// the underlying iterator.
i.saveRangeKey()
pointKeyExists := i.nextPointCurrentUserKey()
if i.err != nil {
i.iterValidityState = IterExhausted
return
}
i.rangeKey.rangeKeyOnly = !pointKeyExists
i.iterValidityState = IterValid
return
case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
// NB: treating InternalKeyKindSingleDelete as equivalent to DEL is not
// only simpler, but is also necessary for correctness due to
// InternalKeyKindSSTableInternalObsoleteBit.
i.nextUserKey()
continue
case InternalKeyKindSet, InternalKeyKindSetWithDelete:
i.keyBuf = append(i.keyBuf[:0], key.UserKey...)
i.key = i.keyBuf
i.value = i.iterKV.V
i.iterValidityState = IterValid
i.saveRangeKey()
return
case InternalKeyKindMerge:
// Resolving the merge may advance us to the next point key, which
// may be covered by a different set of range keys. Save the range
// key state so we don't lose it.
i.saveRangeKey()
if i.mergeForward(key) {
i.iterValidityState = IterValid
return
}
// The merge didn't yield a valid key, either because the value
// merger indicated it should be deleted, or because an error was
// encountered.
i.iterValidityState = IterExhausted
if i.err != nil {
return
}
if i.pos != iterPosNext {
i.nextUserKey()
}
if i.closeValueCloser() != nil {
return
}
i.pos = iterPosCurForward
default:
i.err = base.CorruptionErrorf("pebble: invalid internal key kind: %d", errors.Safe(key.Kind()))
i.iterValidityState = IterExhausted
return
}
}
// Is iterKey nil due to an error?
if err := i.iter.Error(); err != nil {
i.err = err
i.iterValidityState = IterExhausted
}
}
func (i *Iterator) nextPointCurrentUserKey() bool {
// If the user has configured a SkipPoint function and the current user key
// would be skipped by it, there's no need to step forward looking for a
// point key. If we were to find one, it should be skipped anyways.
if i.opts.SkipPoint != nil && i.opts.SkipPoint(i.key) {
return false
}
i.pos = iterPosCurForward
i.iterKV = i.iter.Next()
i.stats.ForwardStepCount[InternalIterCall]++
if i.iterKV == nil {
if err := i.iter.Error(); err != nil {
i.err = err
} else {
i.pos = iterPosNext
}
return false
}
if !i.equal(i.key, i.iterKV.K.UserKey) {
i.pos = iterPosNext
return false
}
key := i.iterKV.K
switch key.Kind() {
case InternalKeyKindRangeKeySet:
// RangeKeySets must always be interleaved as the first internal key
// for a user key.
i.err = base.CorruptionErrorf("pebble: unexpected range key set mid-user key")
return false
case InternalKeyKindDelete, InternalKeyKindSingleDelete, InternalKeyKindDeleteSized:
// NB: treating InternalKeyKindSingleDelete as equivalent to DEL is not
// only simpler, but is also necessary for correctness due to
// InternalKeyKindSSTableInternalObsoleteBit.
return false
case InternalKeyKindSet, InternalKeyKindSetWithDelete:
i.value = i.iterKV.V
return true
case InternalKeyKindMerge:
return i.mergeForward(key)
default:
i.err = base.CorruptionErrorf("pebble: invalid internal key kind: %d", errors.Safe(key.Kind()))
return false
}
}
// mergeForward resolves a MERGE key, advancing the underlying iterator forward
// to merge with subsequent keys with the same userkey. mergeForward returns a
// boolean indicating whether or not the merge yielded a valid key. A merge may
// not yield a valid key if an error occurred, in which case i.err is non-nil,
// or the user's value merger specified the key to be deleted.
//
// mergeForward does not update iterValidityState.
func (i *Iterator) mergeForward(key base.InternalKey) (valid bool) {
var iterValue []byte
iterValue, _, i.err = i.iterKV.Value(nil)
if i.err != nil {
return false
}
var valueMerger ValueMerger
valueMerger, i.err = i.merge(key.UserKey, iterValue)
if i.err != nil {
return false
}
i.mergeNext(key, valueMerger)
if i.err != nil {
return false
}
var needDelete bool
var value []byte
value, needDelete, i.valueCloser, i.err = finishValueMerger(
valueMerger, true /* includesBase */)
i.value = base.MakeInPlaceValue(value)
if i.err != nil {
return false
}
if needDelete {
_ = i.closeValueCloser()
return false
}
return true
}
func (i *Iterator) closeValueCloser() error {
if i.valueCloser != nil {
i.err = i.valueCloser.Close()
i.valueCloser = nil
}
return i.err
}
func (i *Iterator) nextUserKey() {
if i.iterKV == nil {
return
}
trailer := i.iterKV.K.Trailer
done := i.iterKV.K.Trailer <= base.InternalKeyZeroSeqnumMaxTrailer
if i.iterValidityState != IterValid {
i.keyBuf = append(i.keyBuf[:0], i.iterKV.K.UserKey...)
i.key = i.keyBuf
}
for {
i.stats.ForwardStepCount[InternalIterCall]++
i.iterKV = i.iter.Next()
if i.iterKV == nil {
if err := i.iter.Error(); err != nil {
i.err = err
return
}
}
// NB: We're guaranteed to be on the next user key if the previous key
// had a zero sequence number (`done`), or the new key has a trailer
// greater or equal to the previous key's trailer. This is true because
// internal keys with the same user key are sorted by InternalKeyTrailer in
// strictly monotonically descending order. We expect the trailer
// optimization to trigger around 50% of the time with randomly
// distributed writes. We expect it to trigger very frequently when
// iterating through ingested sstables, which contain keys that all have
// the same sequence number.
if done || i.iterKV == nil || i.iterKV.K.Trailer >= trailer {
break
}
if !i.equal(i.key, i.iterKV.K.UserKey) {
break
}
done = i.iterKV.K.Trailer <= base.InternalKeyZeroSeqnumMaxTrailer
trailer = i.iterKV.K.Trailer
}
}
func (i *Iterator) maybeSampleRead() {
// This method is only called when a public method of Iterator is
// returning, and below we exclude the case were the iterator is paused at
// a limit. The effect of these choices is that keys that are deleted, but
// are encountered during iteration, are not accounted for in the read
// sampling and will not cause read driven compactions, even though we are
// incurring cost in iterating over them. And this issue is not limited to
// Iterator, which does not see the effect of range deletes, which may be
// causing iteration work in mergingIter. It is not clear at this time
// whether this is a deficiency worth addressing.
if i.iterValidityState != IterValid {
return
}
if i.readState == nil {
return
}
if i.readSampling.forceReadSampling {
i.sampleRead()
return
}
samplingPeriod := int32(int64(readBytesPeriod) * i.readState.db.opts.Experimental.ReadSamplingMultiplier)
if samplingPeriod <= 0 {
return
}
bytesRead := uint64(len(i.key) + i.value.Len())
for i.readSampling.bytesUntilReadSampling < bytesRead {
i.readSampling.bytesUntilReadSampling += uint64(rand.Uint32N(2 * uint32(samplingPeriod)))
// The block below tries to adjust for the case where this is the
// first read in a newly-opened iterator. As bytesUntilReadSampling
// starts off at zero, we don't want to sample the first read of
// every newly-opened iterator, but we do want to sample some of them.
if !i.readSampling.initialSamplePassed {
i.readSampling.initialSamplePassed = true
if i.readSampling.bytesUntilReadSampling > bytesRead {
if rand.Uint64N(i.readSampling.bytesUntilReadSampling) > bytesRead {
continue
}
}
}
i.sampleRead()
}
i.readSampling.bytesUntilReadSampling -= bytesRead
}
func (i *Iterator) sampleRead() {
var topFile *manifest.FileMetadata
topLevel, numOverlappingLevels := numLevels, 0
mi := i.merging
if mi == nil {
return
}
if len(mi.levels) > 1 {
mi.ForEachLevelIter(func(li *levelIter) (done bool) {
if li.layer.IsFlushableIngests() {
return false
}
l := li.layer.Level()
if f := li.iterFile; f != nil {
var containsKey bool
if i.pos == iterPosNext || i.pos == iterPosCurForward ||
i.pos == iterPosCurForwardPaused {
containsKey = i.cmp(f.SmallestPointKey.UserKey, i.key) <= 0
} else if i.pos == iterPosPrev || i.pos == iterPosCurReverse ||
i.pos == iterPosCurReversePaused {
containsKey = i.cmp(f.LargestPointKey.UserKey, i.key) >= 0
}
// Do nothing if the current key is not contained in f's
// bounds. We could seek the LevelIterator at this level
// to find the right file, but the performance impacts of
// doing that are significant enough to negate the benefits
// of read sampling in the first place. See the discussion
// at:
// https://github.com/cockroachdb/pebble/pull/1041#issuecomment-763226492
if containsKey {
numOverlappingLevels++
if numOverlappingLevels >= 2 {
// Terminate the loop early if at least 2 overlapping levels are found.
return true
}
topLevel = l
topFile = f
}
}
return false
})
}
if topFile == nil || topLevel >= numLevels {
return
}
if numOverlappingLevels >= 2 {
allowedSeeks := topFile.AllowedSeeks.Add(-1)
if allowedSeeks == 0 {
// Since the compaction queue can handle duplicates, we can keep
// adding to the queue even once allowedSeeks hits 0.
// In fact, we NEED to keep adding to the queue, because the queue
// is small and evicts older and possibly useful compactions.
topFile.AllowedSeeks.Add(topFile.InitAllowedSeeks)
read := readCompaction{
start: topFile.SmallestPointKey.UserKey,
end: topFile.LargestPointKey.UserKey,
level: topLevel,
fileNum: topFile.FileNum,
}
i.readSampling.pendingCompactions.add(&read, i.cmp)
}
}
}
func (i *Iterator) findPrevEntry(limit []byte) {
i.iterValidityState = IterExhausted
i.pos = iterPosCurReverse
if i.opts.rangeKeys() && i.rangeKey != nil {
i.rangeKey.rangeKeyOnly = false
}
// Close the closer for the current value if one was open.
if i.valueCloser != nil {
i.err = i.valueCloser.Close()
i.valueCloser = nil
if i.err != nil {
i.iterValidityState = IterExhausted
return
}
}
var valueMerger ValueMerger
firstLoopIter := true
rangeKeyBoundary := false
// The code below compares with limit in multiple places. As documented in
// findNextEntry, this is being done to make the behavior of limit
// deterministic to allow for metamorphic testing. It is not required by
// the best-effort contract of limit.
for i.iterKV != nil {
key := i.iterKV.K
// NB: We cannot pause if the current key is covered by a range key.
// Otherwise, the user might not ever learn of a range key that covers
// the key space being iterated over in which there are no point keys.
// Since limits are best effort, ignoring the limit in this case is
// allowed by the contract of limit.
if firstLoopIter && limit != nil && i.cmp(limit, i.iterKV.K.UserKey) > 0 && !i.rangeKeyWithinLimit(limit) {
i.iterValidityState = IterAtLimit
i.pos = iterPosCurReversePaused
return
}
firstLoopIter = false
if i.iterValidityState == IterValid {
if !i.equal(key.UserKey, i.key) {
// We've iterated to the previous user key.
i.pos = iterPosPrev
if valueMerger != nil {
var needDelete bool
var value []byte
value, needDelete, i.valueCloser, i.err = finishValueMerger(valueMerger, true /* includesBase */)
i.value = base.MakeInPlaceValue(value)
if i.err == nil && needDelete {
// The point key at this key is deleted. If we also have
// a range key boundary at this key, we still want to
// return. Otherwise, we need to continue looking for
// a live key.
i.value = LazyValue{}
if rangeKeyBoundary {
i.rangeKey.rangeKeyOnly = true
} else {
i.iterValidityState = IterExhausted
if i.closeValueCloser() == nil {
continue
}
}
}
}
if i.err != nil {
i.iterValidityState = IterExhausted
}
return
}
}
// If the user has configured a SkipPoint function, invoke it to see
// whether we should skip over the current user key.
if i.opts.SkipPoint != nil && key.Kind() != InternalKeyKindRangeKeySet && i.opts.SkipPoint(key.UserKey) {
// NB: We could call prevUserKey, but in some cases the SkipPoint
// predicate function might be cheaper than prevUserKey's key copy
// and key comparison. This should be the case for MVCC suffix
// comparisons, for example. In the future, we could expand the
// SkipPoint interface to give the implementor more control over
// whether we skip over just the internal key, the user key, or even
// the key prefix.
i.stats.ReverseStepCount[InternalIterCall]++
i.iterKV = i.iter.Prev()
if i.iterKV == nil {
if err := i.iter.Error(); err != nil {
i.err = err
i.iterValidityState = IterExhausted
return
}
}
if limit != nil && i.iterKV != nil && i.cmp(limit, i.iterKV.K.UserKey) > 0 && !i.rangeKeyWithinLimit(limit) {
i.iterValidityState = IterAtLimit
i.pos = iterPosCurReversePaused
return
}
continue
}
switch key.Kind() {
case InternalKeyKindRangeKeySet:
// Range key start boundary markers are interleaved with the maximum
// sequence number, so if there's a point key also at this key, we
// must've already iterated over it.