Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Merge pull request #649 from prometheus/lr-inject-metrics
Browse files Browse the repository at this point in the history
wal: Inject LiveReader metrics rather than registry
  • Loading branch information
brancz authored Jul 3, 2019
2 parents 7ef1018 + 078895b commit d230c67
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 18 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## Master / unreleased

## 0.9.1

- [CHANGE] LiveReader metrics are now injected rather than global.

## 0.9.0

- [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609)
Expand All @@ -9,7 +13,7 @@
- [BUGFIX] `prometheus_tsdb_compactions_failed_total` is now incremented on any compaction failure.
- [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before.
- [CHANGE] Create new clean segment when starting the WAL.
- [CHANGE] Renamed metric from `prometheus_tsdb_wal_reader_corruption_errors` to `prometheus_tsdb_wal_reader_corruption_errors_total`
- [CHANGE] Renamed metric from `prometheus_tsdb_wal_reader_corruption_errors` to `prometheus_tsdb_wal_reader_corruption_errors_total`.
- [ENHANCEMENT] Improved atomicity of .tmp block replacement during compaction for usual case.
- [ENHANCEMENT] Improved postings intersection matching.
- [ENHANCEMENT] Reduced disk usage for WAL for small setups.
Expand Down
42 changes: 28 additions & 14 deletions wal/live_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,40 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// liveReaderMetrics holds all metrics exposed by the LiveReader.
type liveReaderMetrics struct {
readerCorruptionErrors *prometheus.CounterVec
}

// LiveReaderMetrics instatiates, registers and returns metrics to be injected
// at LiveReader instantiation.
func NewLiveReaderMetrics(reg prometheus.Registerer) *liveReaderMetrics {
m := &liveReaderMetrics{
readerCorruptionErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_reader_corruption_errors_total",
Help: "Errors encountered when reading the WAL.",
}, []string{"error"}),
}

if reg != nil {
reg.Register(m.readerCorruptionErrors)
}

return m
}

// NewLiveReader returns a new live reader.
func NewLiveReader(logger log.Logger, reg prometheus.Registerer, r io.Reader) *LiveReader {
func NewLiveReader(logger log.Logger, metrics *liveReaderMetrics, r io.Reader) *LiveReader {
lr := &LiveReader{
logger: logger,
rdr: r,
logger: logger,
rdr: r,
metrics: metrics,

// Until we understand how they come about, make readers permissive
// to records spanning pages.
permissive: true,
}

lr.readerCorruptionErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "prometheus_tsdb_wal_reader_corruption_errors_total",
Help: "Errors encountered when reading the WAL.",
}, []string{"error"})

if reg != nil {
reg.MustRegister(lr.readerCorruptionErrors)
}

return lr
}

Expand Down Expand Up @@ -74,7 +88,7 @@ type LiveReader struct {
// NB the non-ive Reader implementation allows for this.
permissive bool

readerCorruptionErrors *prometheus.CounterVec
metrics *liveReaderMetrics
}

// Err returns any errors encountered reading the WAL. io.EOFs are not terminal
Expand Down Expand Up @@ -282,7 +296,7 @@ func (r *LiveReader) readRecord() ([]byte, int, error) {
if !r.permissive {
return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize)
}
r.readerCorruptionErrors.WithLabelValues("record_span_page").Inc()
r.metrics.readerCorruptionErrors.WithLabelValues("record_span_page").Inc()
level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize)
}
if recordHeaderSize+length > pageSize {
Expand Down
6 changes: 3 additions & 3 deletions wal/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var readerConstructors = map[string]func(io.Reader) reader{
return NewReader(r)
},
"LiveReader": func(r io.Reader) reader {
lr := NewLiveReader(log.NewNopLogger(), nil, r)
lr := NewLiveReader(log.NewNopLogger(), NewLiveReaderMetrics(nil), r)
lr.eofNonErr = true
return lr
},
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestReader_Live(t *testing.T) {
// Read from a second FD on the same file.
readFd, err := os.Open(writeFd.Name())
testutil.Ok(t, err)
reader := NewLiveReader(logger, nil, readFd)
reader := NewLiveReader(logger, NewLiveReaderMetrics(nil), readFd)
for _, exp := range testReaderCases[i].exp {
for !reader.Next() {
testutil.Assert(t, reader.Err() == io.EOF, "expect EOF, got: %v", reader.Err())
Expand Down Expand Up @@ -518,7 +518,7 @@ func TestLiveReaderCorrupt_RecordTooLongAndShort(t *testing.T) {
testutil.Ok(t, err)
defer seg.Close()

r := NewLiveReader(logger, nil, seg)
r := NewLiveReader(logger, NewLiveReaderMetrics(nil), seg)
testutil.Assert(t, r.Next() == false, "expected no records")
testutil.Assert(t, r.Err().Error() == "record length greater than a single page: 65542 > 32768", "expected error, got: %v", r.Err())
}
Expand Down

0 comments on commit d230c67

Please sign in to comment.