diff --git a/tsdb/db.go b/tsdb/db.go index 2436fab2ace..8fea0fa6ccb 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1510,7 +1510,7 @@ func (db *DB) reloadBlocks() (err error) { blockMetas = append(blockMetas, b.Meta()) } if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { - level.Warn(db.logger).Log("msg", "Overlapping blocks found during reloadBlocks", "detail", overlaps.String()) + level.Debug(db.logger).Log("msg", "Overlapping blocks found during reloadBlocks", "detail", overlaps.String()) } // Append blocks to old, deletable blocks, so we can close them. diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 222a8b0d6f2..0162a41209f 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -25,6 +25,7 @@ import ( "sync" "github.com/bboreham/go-loser" + "github.com/cespare/xxhash/v2" "golang.org/x/exp/slices" "github.com/prometheus/prometheus/model/labels" @@ -49,46 +50,63 @@ var ensureOrderBatchPool = sync.Pool{ }, } +const shardCount = 256 + +type memPostingMap struct { + mtx *sync.RWMutex + series map[string]map[string][]storage.SeriesRef +} + // MemPostings holds postings list for series ID per label pair. They may be written // to out of order. // EnsureOrder() must be called once before any reads are done. This allows for quick // unordered batch fills on startup. type MemPostings struct { - mtx sync.RWMutex - m map[string]map[string][]storage.SeriesRef + m [shardCount]memPostingMap ordered bool } // NewMemPostings returns a memPostings that's ready for reads and writes. func NewMemPostings() *MemPostings { - return &MemPostings{ - m: make(map[string]map[string][]storage.SeriesRef, 512), + p := MemPostings{ + m: [shardCount]memPostingMap{}, ordered: true, } + for i := 0; i < shardCount; i++ { + p.m[i].mtx = &sync.RWMutex{} + p.m[i].series = make(map[string]map[string][]storage.SeriesRef, 512) + } + return &p } // NewUnorderedMemPostings returns a memPostings that is not safe to be read from // until EnsureOrder() was called once. func NewUnorderedMemPostings() *MemPostings { - return &MemPostings{ - m: make(map[string]map[string][]storage.SeriesRef, 512), + p := MemPostings{ + m: [shardCount]memPostingMap{}, ordered: false, } + for i := 0; i < shardCount; i++ { + p.m[i].mtx = &sync.RWMutex{} + p.m[i].series = make(map[string]map[string][]storage.SeriesRef, 512) + } + return &p } // Symbols returns an iterator over all unique name and value strings, in order. func (p *MemPostings) Symbols() StringIter { - p.mtx.RLock() - // Add all the strings to a map to de-duplicate. symbols := make(map[string]struct{}, 512) - for n, e := range p.m { - symbols[n] = struct{}{} - for v := range e { - symbols[v] = struct{}{} + for _, shard := range p.m { + shard.mtx.RLock() + for n, e := range shard.series { + symbols[n] = struct{}{} + for v := range e { + symbols[v] = struct{}{} + } } + shard.mtx.RUnlock() } - p.mtx.RUnlock() res := make([]string, 0, len(symbols)) for k := range symbols { @@ -101,15 +119,17 @@ func (p *MemPostings) Symbols() StringIter { // SortedKeys returns a list of sorted label keys of the postings. func (p *MemPostings) SortedKeys() []labels.Label { - p.mtx.RLock() - keys := make([]labels.Label, 0, len(p.m)) + keys := make([]labels.Label, 0, shardCount) - for n, e := range p.m { - for v := range e { - keys = append(keys, labels.Label{Name: n, Value: v}) + for _, shard := range p.m { + shard.mtx.RLock() + for n, e := range shard.series { + for v := range e { + keys = append(keys, labels.Label{Name: n, Value: v}) + } } + shard.mtx.RUnlock() } - p.mtx.RUnlock() slices.SortFunc(keys, func(a, b labels.Label) int { nameCompare := strings.Compare(a.Name, b.Name) @@ -125,29 +145,28 @@ func (p *MemPostings) SortedKeys() []labels.Label { // LabelNames returns all the unique label names. func (p *MemPostings) LabelNames() []string { - p.mtx.RLock() - defer p.mtx.RUnlock() - n := len(p.m) - if n == 0 { - return nil - } - - names := make([]string, 0, n-1) - for name := range p.m { - if name != allPostingsKey.Name { - names = append(names, name) + names := make([]string, 0, shardCount) + for _, shard := range p.m { + shard.mtx.RLock() + for name := range shard.series { + if name != allPostingsKey.Name { + names = append(names, name) + } } + shard.mtx.RUnlock() } return names } // LabelValues returns label values for the given name. func (p *MemPostings) LabelValues(_ context.Context, name string) []string { - p.mtx.RLock() - defer p.mtx.RUnlock() + slot := xxhash.Sum64String(name) % shardCount - values := make([]string, 0, len(p.m[name])) - for v := range p.m[name] { + p.m[slot].mtx.RLock() + defer p.m[slot].mtx.RUnlock() + + values := make([]string, 0, len(p.m[slot].series[name])) + for v := range p.m[slot].series[name] { values = append(values, v) } return values @@ -165,7 +184,6 @@ type PostingsStats struct { // Stats calculates the cardinality statistics from postings. func (p *MemPostings) Stats(label string, limit int) *PostingsStats { var size uint64 - p.mtx.RLock() metrics := &maxHeap{} labels := &maxHeap{} @@ -178,26 +196,28 @@ func (p *MemPostings) Stats(label string, limit int) *PostingsStats { labelValueLength.init(limit) labelValuePairs.init(limit) - for n, e := range p.m { - if n == "" { - continue - } - labels.push(Stat{Name: n, Count: uint64(len(e))}) - numLabelPairs += len(e) - size = 0 - for name, values := range e { - if n == label { - metrics.push(Stat{Name: name, Count: uint64(len(values))}) + for _, shard := range p.m { + shard.mtx.RLock() + for n, e := range shard.series { + if n == "" { + continue } - seriesCnt := uint64(len(values)) - labelValuePairs.push(Stat{Name: n + "=" + name, Count: seriesCnt}) - size += uint64(len(name)) * seriesCnt + labels.push(Stat{Name: n, Count: uint64(len(e))}) + numLabelPairs += len(e) + size = 0 + for name, values := range e { + if n == label { + metrics.push(Stat{Name: name, Count: uint64(len(values))}) + } + seriesCnt := uint64(len(values)) + labelValuePairs.push(Stat{Name: n + "=" + name, Count: seriesCnt}) + size += uint64(len(name)) * seriesCnt + } + labelValueLength.push(Stat{Name: n, Count: size}) } - labelValueLength.push(Stat{Name: n, Count: size}) + shard.mtx.RUnlock() } - p.mtx.RUnlock() - return &PostingsStats{ CardinalityMetricsStats: metrics.get(), CardinalityLabelStats: labels.get(), @@ -210,12 +230,13 @@ func (p *MemPostings) Stats(label string, limit int) *PostingsStats { // Get returns a postings list for the given label pair. func (p *MemPostings) Get(name, value string) Postings { var lp []storage.SeriesRef - p.mtx.RLock() - l := p.m[name] + slot := xxhash.Sum64String(name) % shardCount + p.m[slot].mtx.RLock() + l := p.m[slot].series[name] if l != nil { lp = l[value] } - p.mtx.RUnlock() + p.m[slot].mtx.RUnlock() if lp == nil { return EmptyPostings() @@ -234,9 +255,6 @@ func (p *MemPostings) All() Postings { // CPU cores used for this operation. If it is <= 0, GOMAXPROCS is used. // GOMAXPROCS was the default before introducing this parameter. func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) { - p.mtx.Lock() - defer p.mtx.Unlock() - if p.ordered { return } @@ -265,15 +283,19 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) { } nextJob := ensureOrderBatchPool.Get().(*[][]storage.SeriesRef) - for _, e := range p.m { - for _, l := range e { - *nextJob = append(*nextJob, l) - - if len(*nextJob) >= ensureOrderBatchSize { - workc <- nextJob - nextJob = ensureOrderBatchPool.Get().(*[][]storage.SeriesRef) + for _, shard := range p.m { + shard.mtx.Lock() + for _, e := range shard.series { + for _, l := range e { + *nextJob = append(*nextJob, l) + + if len(*nextJob) >= ensureOrderBatchSize { + workc <- nextJob + nextJob = ensureOrderBatchPool.Get().(*[][]storage.SeriesRef) + } } } + shard.mtx.Unlock() } // If the last job was partially filled, we need to push it to workers too. @@ -293,108 +315,112 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) { // Collect all keys relevant for deletion once. New keys added afterwards // can by definition not be affected by any of the given deletes. - p.mtx.RLock() - for n := range p.m { - keys = append(keys, n) + for _, shard := range p.m { + shard.mtx.RLock() + for n := range shard.series { + keys = append(keys, n) + } + shard.mtx.RUnlock() } - p.mtx.RUnlock() for _, n := range keys { - p.mtx.RLock() + slot := xxhash.Sum64String(n) % shardCount + p.m[slot].mtx.RLock() vals = vals[:0] - for v := range p.m[n] { + for v := range p.m[slot].series[n] { vals = append(vals, v) } - p.mtx.RUnlock() + p.m[slot].mtx.RUnlock() // For each posting we first analyse whether the postings list is affected by the deletes. // If yes, we actually reallocate a new postings list. for _, l := range vals { // Only lock for processing one postings list so we don't block reads for too long. - p.mtx.Lock() + p.m[slot].mtx.Lock() found := false - for _, id := range p.m[n][l] { + for _, id := range p.m[slot].series[n][l] { if _, ok := deleted[id]; ok { found = true break } } if !found { - p.mtx.Unlock() + p.m[slot].mtx.Unlock() continue } - repl := make([]storage.SeriesRef, 0, len(p.m[n][l])) + repl := make([]storage.SeriesRef, 0, len(p.m[slot].series[n][l])) - for _, id := range p.m[n][l] { + for _, id := range p.m[slot].series[n][l] { if _, ok := deleted[id]; !ok { repl = append(repl, id) } } if len(repl) > 0 { - p.m[n][l] = repl + p.m[slot].series[n][l] = repl } else { - delete(p.m[n], l) + delete(p.m[slot].series[n], l) } - p.mtx.Unlock() + p.m[slot].mtx.Unlock() } - p.mtx.Lock() - if len(p.m[n]) == 0 { - delete(p.m, n) + p.m[slot].mtx.Lock() + if len(p.m[slot].series[n]) == 0 { + delete(p.m[slot].series, n) } - p.mtx.Unlock() + p.m[slot].mtx.Unlock() } } // Iter calls f for each postings list. It aborts if f returns an error and returns it. func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error { - p.mtx.RLock() - defer p.mtx.RUnlock() - - for n, e := range p.m { - for v, p := range e { - if err := f(labels.Label{Name: n, Value: v}, newListPostings(p...)); err != nil { - return err + for _, shard := range p.m { + shard.mtx.RLock() + for n, e := range shard.series { + for v, p := range e { + if err := f(labels.Label{Name: n, Value: v}, newListPostings(p...)); err != nil { + return err + } } } + shard.mtx.RUnlock() } return nil } // Add a label set to the postings index. func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) { - p.mtx.Lock() - lset.Range(func(l labels.Label) { p.addFor(id, l) }) p.addFor(id, allPostingsKey) - - p.mtx.Unlock() } func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { - nm, ok := p.m[l.Name] + slot := xxhash.Sum64String(l.Name) % shardCount + + p.m[slot].mtx.Lock() + defer p.m[slot].mtx.Unlock() + + nm, ok := p.m[slot].series[l.Name] if !ok { nm = map[string][]storage.SeriesRef{} - p.m[l.Name] = nm + p.m[slot].series[l.Name] = nm } - list := append(nm[l.Value], id) - nm[l.Value] = list + // If ordering is disabled then simply append our id. if !p.ordered { + nm[l.Value] = append(nm[l.Value], id) return } + // There is no guarantee that no higher ID was inserted before as they may // be generated independently before adding them to postings. - // We repair order violations on insert. The invariant is that the first n-1 - // items in the list are already sorted. - for i := len(list) - 1; i >= 1; i-- { - if list[i] >= list[i-1] { - break - } - list[i], list[i-1] = list[i-1], list[i] - } + // We repair order violations on insert. + // Find the slot we need to insert our id into, grow the slice and insert our id. + index, _ := slices.BinarySearch(nm[l.Value], id) + nm[l.Value] = append(nm[l.Value], storage.SeriesRef(0)) + copy(nm[l.Value][index+1:], nm[l.Value][index:]) + nm[l.Value][index] = id } // ExpandPostings returns the postings expanded as a slice. diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index e8df6dbd290..2481e3baf56 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -24,6 +24,7 @@ import ( "strconv" "testing" + "github.com/cespare/xxhash/v2" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/labels" @@ -32,17 +33,19 @@ import ( func TestMemPostings_addFor(t *testing.T) { p := NewMemPostings() - p.m[allPostingsKey.Name] = map[string][]storage.SeriesRef{} - p.m[allPostingsKey.Name][allPostingsKey.Value] = []storage.SeriesRef{1, 2, 3, 4, 6, 7, 8} + slot := xxhash.Sum64String(allPostingsKey.Name) % shardCount + p.m[slot].series[allPostingsKey.Name] = map[string][]storage.SeriesRef{} + p.m[slot].series[allPostingsKey.Name][allPostingsKey.Value] = []storage.SeriesRef{1, 2, 3, 4, 6, 7, 8} p.addFor(5, allPostingsKey) - require.Equal(t, []storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8}, p.m[allPostingsKey.Name][allPostingsKey.Value]) + require.Equal(t, []storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8}, p.m[slot].series[allPostingsKey.Name][allPostingsKey.Value]) } func TestMemPostings_ensureOrder(t *testing.T) { p := NewUnorderedMemPostings() - p.m["a"] = map[string][]storage.SeriesRef{} + slot := xxhash.Sum64String("a") % shardCount + p.m[slot].series["a"] = map[string][]storage.SeriesRef{} for i := 0; i < 100; i++ { l := make([]storage.SeriesRef, 100) @@ -51,18 +54,18 @@ func TestMemPostings_ensureOrder(t *testing.T) { } v := fmt.Sprintf("%d", i) - p.m["a"][v] = l + p.m[slot].series["a"][v] = l } p.EnsureOrder(0) - for _, e := range p.m { - for _, l := range e { - ok := sort.SliceIsSorted(l, func(i, j int) bool { - return l[i] < l[j] - }) - if !ok { - t.Fatalf("postings list %v is not sorted", l) + for _, shard := range p.m { + for _, e := range shard.series { + for _, l := range e { + ok := sort.SliceIsSorted(l, func(i, j int) bool { + return l[i] < l[j] + }) + require.True(t, ok, "postings list %v is not sorted", l) } } } @@ -98,7 +101,8 @@ func BenchmarkMemPostings_ensureOrder(b *testing.B) { // Generate postings. for l := 0; l < testData.numLabels; l++ { labelName := strconv.Itoa(l) - p.m[labelName] = map[string][]storage.SeriesRef{} + slot := xxhash.Sum64String(labelName) % shardCount + p.m[slot].series[labelName] = map[string][]storage.SeriesRef{} for v := 0; v < testData.numValuesPerLabel; v++ { refs := make([]storage.SeriesRef, testData.numRefsPerValue) @@ -107,7 +111,7 @@ func BenchmarkMemPostings_ensureOrder(b *testing.B) { } labelValue := strconv.Itoa(v) - p.m[labelName][labelValue] = refs + p.m[slot].series[labelName][labelValue] = refs } } @@ -1296,3 +1300,37 @@ func BenchmarkListPostings(b *testing.B) { }) } } + +func BenchmarkMemPostings_Add(b *testing.B) { + var series []struct { + id storage.SeriesRef + lset labels.Labels + } + for i := 0; i < 1000; i++ { + series = append(series, struct { + id storage.SeriesRef + lset labels.Labels + }{ + id: storage.SeriesRef(i), + lset: labels.FromStrings( + "job", fmt.Sprintf("job%d", i%100), + "instance", fmt.Sprintf("instance%d", i), + "cluster", "dev", + "label1", fmt.Sprintf("value%d", i%5), + "label2", fmt.Sprintf("value%d", i%5), + "label3", fmt.Sprintf("value%d", i%5), + "label4", fmt.Sprintf("value%d", i%5), + "label5", fmt.Sprintf("value%d", i%5), + ), + }) + } + + p := NewMemPostings() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + for i := 0; i < len(series); i++ { + p.Add(series[i].id, series[i].lset) + } + } +} diff --git a/tsdb/wlog/wlog.go b/tsdb/wlog/wlog.go index fdea756945a..abd4b3b786e 100644 --- a/tsdb/wlog/wlog.go +++ b/tsdb/wlog/wlog.go @@ -228,10 +228,28 @@ type wlMetrics struct { currentSegment prometheus.Gauge writesFailed prometheus.Counter walFileSize prometheus.GaugeFunc + + r prometheus.Registerer +} + +func (w *wlMetrics) Unregister() { + if w.r == nil { + return + } + w.r.Unregister(w.fsyncDuration) + w.r.Unregister(w.pageFlushes) + w.r.Unregister(w.pageCompletions) + w.r.Unregister(w.truncateFail) + w.r.Unregister(w.truncateTotal) + w.r.Unregister(w.currentSegment) + w.r.Unregister(w.writesFailed) + w.r.Unregister(w.walFileSize) } func newWLMetrics(w *WL, r prometheus.Registerer) *wlMetrics { - m := &wlMetrics{} + m := &wlMetrics{ + r: r, + } m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Name: "fsync_duration_seconds", @@ -877,6 +895,8 @@ func (w *WL) Close() (err error) { if err := w.segment.Close(); err != nil { level.Error(w.logger).Log("msg", "close previous segment", "err", err) } + + w.metrics.Unregister() w.closed = true return nil } diff --git a/tsdb/wlog/wlog_test.go b/tsdb/wlog/wlog_test.go index 8f4533e0eac..4107eb24b40 100644 --- a/tsdb/wlog/wlog_test.go +++ b/tsdb/wlog/wlog_test.go @@ -23,6 +23,8 @@ import ( "path/filepath" "testing" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -563,3 +565,13 @@ func BenchmarkWAL_Log(b *testing.B) { }) } } + +func TestUnregisterMetrics(t *testing.T) { + reg := prometheus.NewRegistry() + + for i := 0; i < 2; i++ { + wl, err := New(log.NewNopLogger(), reg, t.TempDir(), CompressionNone) + require.NoError(t, err) + require.NoError(t, wl.Close()) + } +}