From 457ba8adfba04cc6dffca4901b27d68baa267801 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mierzwa?= Date: Fri, 23 Feb 2024 17:01:37 +0000 Subject: [PATCH 1/6] Shard MemPostings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently all access to MemPostings is via a single mutex, which in effect serializes all Add() calls. This can cause MemPostings.Add() to get blocked for a long time in some cases, mostly when: - Prometheus was just started - There's a lot of scrape targets with plenty of metrics - There are indentical label key/value pairs attached to a lot of time series - There is no previous WAL files With such conditions all initial scrapes will happens withing the first scrape interval and will have to append entries to MemPostings causing high lock contention on that single mutex. Presence of 'common' labels that are attached to a lot of time series means that there will be a few entries in MemPostings where the slice of SeriesRef will be very big and that will trigger a lot of sorting calls - due to the fact that SeriesRef are oredered in the slices stored by MemPostings. This need to ensure order will make MemPostings.Add() calls slow and cpu intensive. All of that is problematic but usually most of it happens during WAL replay, so it's only visible on scraped data if there was no WAL to replay. There is https://github.com/prometheus/prometheus/pull/13632 that tries to speed up the code responsible for sorting SeriesRef in MemPostings slices. This change is to also shard MemPostings data structure so instead of a single lock we have a number of locks. Sharding happens on the label name so reads & writes on different label names might end up on a different shard and so allow a level of concurrency. This is similiar to how stripeSeries in tsdb/head.go works. Signed-off-by: Łukasz Mierzwa Signed-off-by: Giedrius Statkevičius --- tsdb/index/postings.go | 219 ++++++++++++++++++++---------------- tsdb/index/postings_test.go | 32 +++--- 2 files changed, 141 insertions(+), 110 deletions(-) diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 222a8b0d6f2..ae7baaad62f 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -25,6 +25,7 @@ import ( "sync" "github.com/bboreham/go-loser" + "github.com/cespare/xxhash/v2" "golang.org/x/exp/slices" "github.com/prometheus/prometheus/model/labels" @@ -49,46 +50,63 @@ var ensureOrderBatchPool = sync.Pool{ }, } +const shardCount = 256 + +type memPostingMap struct { + mtx *sync.RWMutex + series map[string]map[string][]storage.SeriesRef +} + // MemPostings holds postings list for series ID per label pair. They may be written // to out of order. // EnsureOrder() must be called once before any reads are done. This allows for quick // unordered batch fills on startup. type MemPostings struct { - mtx sync.RWMutex - m map[string]map[string][]storage.SeriesRef + m [shardCount]memPostingMap ordered bool } // NewMemPostings returns a memPostings that's ready for reads and writes. func NewMemPostings() *MemPostings { - return &MemPostings{ - m: make(map[string]map[string][]storage.SeriesRef, 512), + p := MemPostings{ + m: [shardCount]memPostingMap{}, ordered: true, } + for i := 0; i < shardCount; i++ { + p.m[i].mtx = &sync.RWMutex{} + p.m[i].series = make(map[string]map[string][]storage.SeriesRef, 512) + } + return &p } // NewUnorderedMemPostings returns a memPostings that is not safe to be read from // until EnsureOrder() was called once. func NewUnorderedMemPostings() *MemPostings { - return &MemPostings{ - m: make(map[string]map[string][]storage.SeriesRef, 512), + p := MemPostings{ + m: [shardCount]memPostingMap{}, ordered: false, } + for i := 0; i < shardCount; i++ { + p.m[i].mtx = &sync.RWMutex{} + p.m[i].series = make(map[string]map[string][]storage.SeriesRef, 512) + } + return &p } // Symbols returns an iterator over all unique name and value strings, in order. func (p *MemPostings) Symbols() StringIter { - p.mtx.RLock() - // Add all the strings to a map to de-duplicate. symbols := make(map[string]struct{}, 512) - for n, e := range p.m { - symbols[n] = struct{}{} - for v := range e { - symbols[v] = struct{}{} + for _, shard := range p.m { + shard.mtx.RLock() + for n, e := range shard.series { + symbols[n] = struct{}{} + for v := range e { + symbols[v] = struct{}{} + } } + shard.mtx.RUnlock() } - p.mtx.RUnlock() res := make([]string, 0, len(symbols)) for k := range symbols { @@ -101,15 +119,17 @@ func (p *MemPostings) Symbols() StringIter { // SortedKeys returns a list of sorted label keys of the postings. func (p *MemPostings) SortedKeys() []labels.Label { - p.mtx.RLock() - keys := make([]labels.Label, 0, len(p.m)) + keys := make([]labels.Label, 0, shardCount) - for n, e := range p.m { - for v := range e { - keys = append(keys, labels.Label{Name: n, Value: v}) + for _, shard := range p.m { + shard.mtx.RLock() + for n, e := range shard.series { + for v := range e { + keys = append(keys, labels.Label{Name: n, Value: v}) + } } + shard.mtx.RUnlock() } - p.mtx.RUnlock() slices.SortFunc(keys, func(a, b labels.Label) int { nameCompare := strings.Compare(a.Name, b.Name) @@ -125,29 +145,28 @@ func (p *MemPostings) SortedKeys() []labels.Label { // LabelNames returns all the unique label names. func (p *MemPostings) LabelNames() []string { - p.mtx.RLock() - defer p.mtx.RUnlock() - n := len(p.m) - if n == 0 { - return nil - } - - names := make([]string, 0, n-1) - for name := range p.m { - if name != allPostingsKey.Name { - names = append(names, name) + names := make([]string, 0, shardCount) + for _, shard := range p.m { + shard.mtx.RLock() + for name := range shard.series { + if name != allPostingsKey.Name { + names = append(names, name) + } } + shard.mtx.RUnlock() } return names } // LabelValues returns label values for the given name. func (p *MemPostings) LabelValues(_ context.Context, name string) []string { - p.mtx.RLock() - defer p.mtx.RUnlock() + slot := xxhash.Sum64String(name) % shardCount + + p.m[slot].mtx.RLock() + defer p.m[slot].mtx.RUnlock() - values := make([]string, 0, len(p.m[name])) - for v := range p.m[name] { + values := make([]string, 0, len(p.m[slot].series[name])) + for v := range p.m[slot].series[name] { values = append(values, v) } return values @@ -165,7 +184,6 @@ type PostingsStats struct { // Stats calculates the cardinality statistics from postings. func (p *MemPostings) Stats(label string, limit int) *PostingsStats { var size uint64 - p.mtx.RLock() metrics := &maxHeap{} labels := &maxHeap{} @@ -178,26 +196,28 @@ func (p *MemPostings) Stats(label string, limit int) *PostingsStats { labelValueLength.init(limit) labelValuePairs.init(limit) - for n, e := range p.m { - if n == "" { - continue - } - labels.push(Stat{Name: n, Count: uint64(len(e))}) - numLabelPairs += len(e) - size = 0 - for name, values := range e { - if n == label { - metrics.push(Stat{Name: name, Count: uint64(len(values))}) + for _, shard := range p.m { + shard.mtx.RLock() + for n, e := range shard.series { + if n == "" { + continue + } + labels.push(Stat{Name: n, Count: uint64(len(e))}) + numLabelPairs += len(e) + size = 0 + for name, values := range e { + if n == label { + metrics.push(Stat{Name: name, Count: uint64(len(values))}) + } + seriesCnt := uint64(len(values)) + labelValuePairs.push(Stat{Name: n + "=" + name, Count: seriesCnt}) + size += uint64(len(name)) * seriesCnt } - seriesCnt := uint64(len(values)) - labelValuePairs.push(Stat{Name: n + "=" + name, Count: seriesCnt}) - size += uint64(len(name)) * seriesCnt + labelValueLength.push(Stat{Name: n, Count: size}) } - labelValueLength.push(Stat{Name: n, Count: size}) + shard.mtx.RUnlock() } - p.mtx.RUnlock() - return &PostingsStats{ CardinalityMetricsStats: metrics.get(), CardinalityLabelStats: labels.get(), @@ -210,12 +230,13 @@ func (p *MemPostings) Stats(label string, limit int) *PostingsStats { // Get returns a postings list for the given label pair. func (p *MemPostings) Get(name, value string) Postings { var lp []storage.SeriesRef - p.mtx.RLock() - l := p.m[name] + slot := xxhash.Sum64String(name) % shardCount + p.m[slot].mtx.RLock() + l := p.m[slot].series[name] if l != nil { lp = l[value] } - p.mtx.RUnlock() + p.m[slot].mtx.RUnlock() if lp == nil { return EmptyPostings() @@ -234,9 +255,6 @@ func (p *MemPostings) All() Postings { // CPU cores used for this operation. If it is <= 0, GOMAXPROCS is used. // GOMAXPROCS was the default before introducing this parameter. func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) { - p.mtx.Lock() - defer p.mtx.Unlock() - if p.ordered { return } @@ -265,15 +283,19 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) { } nextJob := ensureOrderBatchPool.Get().(*[][]storage.SeriesRef) - for _, e := range p.m { - for _, l := range e { - *nextJob = append(*nextJob, l) - - if len(*nextJob) >= ensureOrderBatchSize { - workc <- nextJob - nextJob = ensureOrderBatchPool.Get().(*[][]storage.SeriesRef) + for _, shard := range p.m { + shard.mtx.Lock() + for _, e := range shard.series { + for _, l := range e { + *nextJob = append(*nextJob, l) + + if len(*nextJob) >= ensureOrderBatchSize { + workc <- nextJob + nextJob = ensureOrderBatchPool.Get().(*[][]storage.SeriesRef) + } } } + shard.mtx.Unlock() } // If the last job was partially filled, we need to push it to workers too. @@ -293,91 +315,96 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) { // Collect all keys relevant for deletion once. New keys added afterwards // can by definition not be affected by any of the given deletes. - p.mtx.RLock() - for n := range p.m { - keys = append(keys, n) + for _, shard := range p.m { + shard.mtx.RLock() + for n := range shard.series { + keys = append(keys, n) + } + shard.mtx.RUnlock() } - p.mtx.RUnlock() for _, n := range keys { - p.mtx.RLock() + slot := xxhash.Sum64String(n) % shardCount + p.m[slot].mtx.RLock() vals = vals[:0] - for v := range p.m[n] { + for v := range p.m[slot].series[n] { vals = append(vals, v) } - p.mtx.RUnlock() + p.m[slot].mtx.RUnlock() // For each posting we first analyse whether the postings list is affected by the deletes. // If yes, we actually reallocate a new postings list. for _, l := range vals { // Only lock for processing one postings list so we don't block reads for too long. - p.mtx.Lock() + p.m[slot].mtx.Lock() found := false - for _, id := range p.m[n][l] { + for _, id := range p.m[slot].series[n][l] { if _, ok := deleted[id]; ok { found = true break } } if !found { - p.mtx.Unlock() + p.m[slot].mtx.Unlock() continue } - repl := make([]storage.SeriesRef, 0, len(p.m[n][l])) + repl := make([]storage.SeriesRef, 0, len(p.m[slot].series[n][l])) - for _, id := range p.m[n][l] { + for _, id := range p.m[slot].series[n][l] { if _, ok := deleted[id]; !ok { repl = append(repl, id) } } if len(repl) > 0 { - p.m[n][l] = repl + p.m[slot].series[n][l] = repl } else { - delete(p.m[n], l) + delete(p.m[slot].series[n], l) } - p.mtx.Unlock() + p.m[slot].mtx.Unlock() } - p.mtx.Lock() - if len(p.m[n]) == 0 { - delete(p.m, n) + p.m[slot].mtx.Lock() + if len(p.m[slot].series[n]) == 0 { + delete(p.m[slot].series, n) } - p.mtx.Unlock() + p.m[slot].mtx.Unlock() } } // Iter calls f for each postings list. It aborts if f returns an error and returns it. func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error { - p.mtx.RLock() - defer p.mtx.RUnlock() - - for n, e := range p.m { - for v, p := range e { - if err := f(labels.Label{Name: n, Value: v}, newListPostings(p...)); err != nil { - return err + for _, shard := range p.m { + shard.mtx.RLock() + for n, e := range shard.series { + for v, p := range e { + if err := f(labels.Label{Name: n, Value: v}, newListPostings(p...)); err != nil { + return err + } } } + shard.mtx.RUnlock() } return nil } // Add a label set to the postings index. func (p *MemPostings) Add(id storage.SeriesRef, lset labels.Labels) { - p.mtx.Lock() - lset.Range(func(l labels.Label) { p.addFor(id, l) }) p.addFor(id, allPostingsKey) - - p.mtx.Unlock() } func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { - nm, ok := p.m[l.Name] + slot := xxhash.Sum64String(l.Name) % shardCount + + p.m[slot].mtx.Lock() + defer p.m[slot].mtx.Unlock() + + nm, ok := p.m[slot].series[l.Name] if !ok { nm = map[string][]storage.SeriesRef{} - p.m[l.Name] = nm + p.m[slot].series[l.Name] = nm } list := append(nm[l.Value], id) nm[l.Value] = list diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index e8df6dbd290..161c5aabfa6 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -24,6 +24,7 @@ import ( "strconv" "testing" + "github.com/cespare/xxhash/v2" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/labels" @@ -32,17 +33,19 @@ import ( func TestMemPostings_addFor(t *testing.T) { p := NewMemPostings() - p.m[allPostingsKey.Name] = map[string][]storage.SeriesRef{} - p.m[allPostingsKey.Name][allPostingsKey.Value] = []storage.SeriesRef{1, 2, 3, 4, 6, 7, 8} + slot := xxhash.Sum64String(allPostingsKey.Name) % shardCount + p.m[slot].series[allPostingsKey.Name] = map[string][]storage.SeriesRef{} + p.m[slot].series[allPostingsKey.Name][allPostingsKey.Value] = []storage.SeriesRef{1, 2, 3, 4, 6, 7, 8} p.addFor(5, allPostingsKey) - require.Equal(t, []storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8}, p.m[allPostingsKey.Name][allPostingsKey.Value]) + require.Equal(t, []storage.SeriesRef{1, 2, 3, 4, 5, 6, 7, 8}, p.m[slot].series[allPostingsKey.Name][allPostingsKey.Value]) } func TestMemPostings_ensureOrder(t *testing.T) { p := NewUnorderedMemPostings() - p.m["a"] = map[string][]storage.SeriesRef{} + slot := xxhash.Sum64String("a") % shardCount + p.m[slot].series["a"] = map[string][]storage.SeriesRef{} for i := 0; i < 100; i++ { l := make([]storage.SeriesRef, 100) @@ -51,18 +54,18 @@ func TestMemPostings_ensureOrder(t *testing.T) { } v := fmt.Sprintf("%d", i) - p.m["a"][v] = l + p.m[slot].series["a"][v] = l } p.EnsureOrder(0) - for _, e := range p.m { - for _, l := range e { - ok := sort.SliceIsSorted(l, func(i, j int) bool { - return l[i] < l[j] - }) - if !ok { - t.Fatalf("postings list %v is not sorted", l) + for _, shard := range p.m { + for _, e := range shard.series { + for _, l := range e { + ok := sort.SliceIsSorted(l, func(i, j int) bool { + return l[i] < l[j] + }) + require.True(t, ok, "postings list %v is not sorted", l) } } } @@ -98,7 +101,8 @@ func BenchmarkMemPostings_ensureOrder(b *testing.B) { // Generate postings. for l := 0; l < testData.numLabels; l++ { labelName := strconv.Itoa(l) - p.m[labelName] = map[string][]storage.SeriesRef{} + slot := xxhash.Sum64String(labelName) % shardCount + p.m[slot].series[labelName] = map[string][]storage.SeriesRef{} for v := 0; v < testData.numValuesPerLabel; v++ { refs := make([]storage.SeriesRef, testData.numRefsPerValue) @@ -107,7 +111,7 @@ func BenchmarkMemPostings_ensureOrder(b *testing.B) { } labelValue := strconv.Itoa(v) - p.m[labelName][labelValue] = refs + p.m[slot].series[labelName][labelValue] = refs } } From 91cf5e1f9418c04a4f577b6e4b530f28bd115207 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mierzwa?= Date: Fri, 23 Feb 2024 13:43:22 +0000 Subject: [PATCH 2/6] Speed up MemPostings.addFor() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MemPosting.Add() can be very slow because of the requirement to ensure ordering of SeriesRef values. Speed it up by sort on insert instead of appending and the sorting - this way we scan the slice for the correct place in the slice to insert into. I've used the code from https://stackoverflow.com/questions/42746972/insert-to-a-sorted-slice to insert into a sorted slice. The problem with this code being slow is that all time series must be added to MemPostings when they are first scraped. If MemPostings.Add() is slow then this will block first scrape of each time series because there is a single mutex that guards writes. This can cause missing scraped data for a lot of time series when you restart Prometheus and all time series are initially added. What saves us is that if you restart Prometheus it will replay WAL, which will have most of the scraped time series in it, so while WAL is being replayed MemPostings is being populated and the slowness only makes WAL replay slow, it doesn't affect scrapes. This makes this problem invisible to most users unless they nuke the data directory and then restart Prometheus. Another trigger is having a lot of common labels on time series, if theres'a label/value pair set on all time series then all these time series references will have to be added to a single slice on MemPostings, which will trigger a lot of re-sortings on each Add(). Benchmark results: goos: linux goarch: amd64 pkg: github.com/prometheus/prometheus/tsdb/index cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz │ main.txt │ new.txt │ │ sec/op │ sec/op vs base │ MemPostings_Add-8 91.49m ± 2% 15.41m ± 0% -83.15% (p=0.000 n=20) │ main.txt │ new.txt │ │ B/op │ B/op vs base │ MemPostings_Add-8 288.1Ki ± 0% 288.1Ki ± 0% ~ (p=0.560 n=20) │ main.txt │ new.txt │ │ allocs/op │ allocs/op vs base │ MemPostings_Add-8 98.00 ± 0% 98.00 ± 0% ~ (p=1.000 n=20) ¹ ¹ all samples are equal Signed-off-by: Łukasz Mierzwa --- tsdb/index/postings.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index ae7baaad62f..0162a41209f 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -406,22 +406,21 @@ func (p *MemPostings) addFor(id storage.SeriesRef, l labels.Label) { nm = map[string][]storage.SeriesRef{} p.m[slot].series[l.Name] = nm } - list := append(nm[l.Value], id) - nm[l.Value] = list + // If ordering is disabled then simply append our id. if !p.ordered { + nm[l.Value] = append(nm[l.Value], id) return } + // There is no guarantee that no higher ID was inserted before as they may // be generated independently before adding them to postings. - // We repair order violations on insert. The invariant is that the first n-1 - // items in the list are already sorted. - for i := len(list) - 1; i >= 1; i-- { - if list[i] >= list[i-1] { - break - } - list[i], list[i-1] = list[i-1], list[i] - } + // We repair order violations on insert. + // Find the slot we need to insert our id into, grow the slice and insert our id. + index, _ := slices.BinarySearch(nm[l.Value], id) + nm[l.Value] = append(nm[l.Value], storage.SeriesRef(0)) + copy(nm[l.Value][index+1:], nm[l.Value][index:]) + nm[l.Value][index] = id } // ExpandPostings returns the postings expanded as a slice. From b9b582f3afde64b95c8768d708466a983daba9d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Mierzwa?= Date: Fri, 23 Feb 2024 13:24:31 +0000 Subject: [PATCH 3/6] Add a benchmark for MemPostings.Add() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Łukasz Mierzwa --- tsdb/index/postings_test.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 161c5aabfa6..2481e3baf56 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1300,3 +1300,37 @@ func BenchmarkListPostings(b *testing.B) { }) } } + +func BenchmarkMemPostings_Add(b *testing.B) { + var series []struct { + id storage.SeriesRef + lset labels.Labels + } + for i := 0; i < 1000; i++ { + series = append(series, struct { + id storage.SeriesRef + lset labels.Labels + }{ + id: storage.SeriesRef(i), + lset: labels.FromStrings( + "job", fmt.Sprintf("job%d", i%100), + "instance", fmt.Sprintf("instance%d", i), + "cluster", "dev", + "label1", fmt.Sprintf("value%d", i%5), + "label2", fmt.Sprintf("value%d", i%5), + "label3", fmt.Sprintf("value%d", i%5), + "label4", fmt.Sprintf("value%d", i%5), + "label5", fmt.Sprintf("value%d", i%5), + ), + }) + } + + p := NewMemPostings() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + for i := 0; i < len(series); i++ { + p.Add(series[i].id, series[i].lset) + } + } +} From 627346924a63df93cd575060820614b8fb535f38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 11 Apr 2024 11:30:05 +0300 Subject: [PATCH 4/6] tsdb/wlog: unregister metrics on WL close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thanos can create and destroy TSDBs dynamically, and once a TSDB disappears its files are deleted. Calculating the size of the WAL then fails with errors like: ``` msg: "Failed to calculate size of "wal" dir", "err": "lstat /tsdbdir/wal: no such file or directory", "caller": "wlog.go:271" ``` Signed-off-by: Giedrius Statkevičius --- tsdb/wlog/wlog.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/tsdb/wlog/wlog.go b/tsdb/wlog/wlog.go index fdea756945a..abd4b3b786e 100644 --- a/tsdb/wlog/wlog.go +++ b/tsdb/wlog/wlog.go @@ -228,10 +228,28 @@ type wlMetrics struct { currentSegment prometheus.Gauge writesFailed prometheus.Counter walFileSize prometheus.GaugeFunc + + r prometheus.Registerer +} + +func (w *wlMetrics) Unregister() { + if w.r == nil { + return + } + w.r.Unregister(w.fsyncDuration) + w.r.Unregister(w.pageFlushes) + w.r.Unregister(w.pageCompletions) + w.r.Unregister(w.truncateFail) + w.r.Unregister(w.truncateTotal) + w.r.Unregister(w.currentSegment) + w.r.Unregister(w.writesFailed) + w.r.Unregister(w.walFileSize) } func newWLMetrics(w *WL, r prometheus.Registerer) *wlMetrics { - m := &wlMetrics{} + m := &wlMetrics{ + r: r, + } m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Name: "fsync_duration_seconds", @@ -877,6 +895,8 @@ func (w *WL) Close() (err error) { if err := w.segment.Close(); err != nil { level.Error(w.logger).Log("msg", "close previous segment", "err", err) } + + w.metrics.Unregister() w.closed = true return nil } From c6198815f94e87291479716858b682bc2c652684 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 18 Apr 2024 11:11:37 +0300 Subject: [PATCH 5/6] tsdb/wlog: add test for metrics unregistering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- tsdb/wlog/wlog_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tsdb/wlog/wlog_test.go b/tsdb/wlog/wlog_test.go index 8f4533e0eac..4107eb24b40 100644 --- a/tsdb/wlog/wlog_test.go +++ b/tsdb/wlog/wlog_test.go @@ -23,6 +23,8 @@ import ( "path/filepath" "testing" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -563,3 +565,13 @@ func BenchmarkWAL_Log(b *testing.B) { }) } } + +func TestUnregisterMetrics(t *testing.T) { + reg := prometheus.NewRegistry() + + for i := 0; i < 2; i++ { + wl, err := New(log.NewNopLogger(), reg, t.TempDir(), CompressionNone) + require.NoError(t, err) + require.NoError(t, wl.Close()) + } +} From b68fe2299cd8cd2eb0a856bf72862162c9307c2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 17 Apr 2024 17:15:07 +0300 Subject: [PATCH 6/6] tsdb/db: change log level MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In Thanos/Mimir/etc. it is possible to have OOO support enabled without vertical compaction. This leads to constant messages during reloadBlocks. As a user, this is what I wanted and I know that this is happening, it's not a problem. Reduce the log level to avoid this spam. Signed-off-by: Giedrius Statkevičius --- tsdb/db.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tsdb/db.go b/tsdb/db.go index 2436fab2ace..8fea0fa6ccb 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1510,7 +1510,7 @@ func (db *DB) reloadBlocks() (err error) { blockMetas = append(blockMetas, b.Meta()) } if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { - level.Warn(db.logger).Log("msg", "Overlapping blocks found during reloadBlocks", "detail", overlaps.String()) + level.Debug(db.logger).Log("msg", "Overlapping blocks found during reloadBlocks", "detail", overlaps.String()) } // Append blocks to old, deletable blocks, so we can close them.