Skip to content

Commit

Permalink
Merge pull request prometheus#12003 from codesome/redundant-chunk-access
Browse files Browse the repository at this point in the history
Remove unnecessary chunk fetch in Head queries
  • Loading branch information
codesome authored Feb 22, 2023
2 parents bfcf69b + d504c95 commit 66da1d5
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 37 deletions.
45 changes: 12 additions & 33 deletions tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,10 @@ func (h *headChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) {
s.Unlock()

return &safeChunk{
Chunk: c.chunk,
s: s,
cid: cid,
isoState: h.isoState,
chunkDiskMapper: h.head.chunkDiskMapper,
memChunkPool: &h.head.memChunkPool,
Chunk: c.chunk,
s: s,
cid: cid,
isoState: h.isoState,
}, nil
}

Expand Down Expand Up @@ -600,43 +598,24 @@ func (b boundedIterator) Seek(t int64) chunkenc.ValueType {
// safeChunk makes sure that the chunk can be accessed without a race condition
type safeChunk struct {
chunkenc.Chunk
s *memSeries
cid chunks.HeadChunkID
isoState *isolationState
chunkDiskMapper *chunks.ChunkDiskMapper
memChunkPool *sync.Pool
s *memSeries
cid chunks.HeadChunkID
isoState *isolationState
}

func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
c.s.Lock()
it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, c.memChunkPool, reuseIter)
it := c.s.iterator(c.cid, c.Chunk, c.isoState, reuseIter)
c.s.Unlock()
return it
}

// iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range.
// It is unsafe to call this concurrently with s.append(...) without holding the series lock.
func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool, it chunkenc.Iterator) chunkenc.Iterator {
c, garbageCollect, err := s.chunk(id, chunkDiskMapper, memChunkPool)
// TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got
// accessed. We must ensure to not garbage collect as long as any
// readers still hold a reference.
if err != nil {
return chunkenc.NewNopIterator()
}
defer func() {
if garbageCollect {
// Set this to nil so that Go GC can collect it after it has been used.
// This should be done always at the end.
c.chunk = nil
memChunkPool.Put(c)
}
}()

func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator {
ix := int(id) - int(s.firstChunkID)

numSamples := c.chunk.NumSamples()
numSamples := c.NumSamples()
stopAfter := numSamples

if isoState != nil && !isoState.IsolationDisabled() {
Expand Down Expand Up @@ -681,9 +660,9 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch
return chunkenc.NewNopIterator()
}
if stopAfter == numSamples {
return c.chunk.Iterator(it)
return c.Iterator(it)
}
return makeStopIterator(c.chunk, it, stopAfter)
return makeStopIterator(c, it, stopAfter)
}

// stopIterator wraps an Iterator, but only returns the first
Expand Down
21 changes: 17 additions & 4 deletions tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,18 @@ func TestHead_ReadWAL(t *testing.T) {
require.NoError(t, c.Err())
return x
}
require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil, nil)))

c, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
require.NoError(t, err)
require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
c, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
require.NoError(t, err)
require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
// The samples before the new series record should be discarded since a duplicate record
// is only possible when old samples were compacted.
require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
c, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
require.NoError(t, err)
require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))

q, err := head.ExemplarQuerier(context.Background())
require.NoError(t, err)
Expand Down Expand Up @@ -2566,7 +2573,13 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
require.True(t, ok, "sample append failed")
}

it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil, nil)
c, _, err := s.chunk(0, chunkDiskMapper, &sync.Pool{
New: func() interface{} {
return &memChunk{}
},
})
require.NoError(t, err)
it := c.chunk.Iterator(nil)

// First point.
require.Equal(t, chunkenc.ValFloat, it.Seek(0))
Expand Down
Empty file added tsdb/test.txt
Empty file.

0 comments on commit 66da1d5

Please sign in to comment.