Skip to content

Commit

Permalink
tsdb: make changes after Bryan's review
Browse files Browse the repository at this point in the history
- Make changes less intrusive
- Turn the postings encoder type into a function
- Add NewWriterWithEncoder()

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Dec 19, 2023
1 parent 0c36917 commit d40910a
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 45 deletions.
6 changes: 3 additions & 3 deletions tsdb/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func TestBlockSize(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size")

c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{})
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
require.NoError(t, err)
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -489,7 +489,7 @@ func createBlock(tb testing.TB, dir string, series []storage.Series) string {
}

func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{})
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil)
require.NoError(tb, err)

require.NoError(tb, os.MkdirAll(dir, 0o777))
Expand All @@ -502,7 +502,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
}

func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string {
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{})
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil)
require.NoError(tb, err)

require.NoError(tb, os.MkdirAll(dir, 0o777))
Expand Down
2 changes: 1 addition & 1 deletion tsdb/blockwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
nil,
w.logger,
[]int64{w.blockSize},
chunkenc.NewPool(), LeveledCompactorOptions{})
chunkenc.NewPool(), nil)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "create leveled compactor")
}
Expand Down
21 changes: 17 additions & 4 deletions tsdb/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,28 @@ func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics {

type LeveledCompactorOptions struct {
// PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction.
// If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.RawPostingsEncoder for more.
// If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.EncodePostingsRaw for more.
PE index.PostingsEncoder
// MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used.
MaxBlockChunkSegmentSize int64
// MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used.
MergeFunc storage.VerticalChunkSeriesMergeFunc
}

func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts LeveledCompactorOptions) (*LeveledCompactor, error) {
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
MergeFunc: mergeFunc,
})
}

func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
MergeFunc: mergeFunc,
})
}

func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts LeveledCompactorOptions) (*LeveledCompactor, error) {
if len(ranges) == 0 {
return nil, fmt.Errorf("at least one range must be provided")
}
Expand All @@ -175,7 +188,7 @@ func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Log
}
var pe index.PostingsEncoder
if opts.PE == nil {
pe = &index.RawPostingsEncoder{}
pe = index.EncodePostingsRaw
}
return &LeveledCompactor{
ranges: ranges,
Expand Down Expand Up @@ -615,7 +628,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
}
}

indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder)
indexw, err := index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder)
if err != nil {
return errors.Wrap(err, "open index writer")
}
Expand Down
18 changes: 9 additions & 9 deletions tsdb/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) {
},
}

c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, LeveledCompactorOptions{})
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil)
require.NoError(t, err)

c.plan(metas)
Expand All @@ -178,7 +178,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
180,
540,
1620,
}, nil, LeveledCompactorOptions{})
}, nil, nil)
require.NoError(t, err)

cases := map[string]struct {
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
240,
720,
2160,
}, nil, LeveledCompactorOptions{})
}, nil, nil)
require.NoError(t, err)

cases := []struct {
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
240,
720,
2160,
}, nil, LeveledCompactorOptions{})
}, nil, nil)
require.NoError(t, err)

tmpdir := t.TempDir()
Expand Down Expand Up @@ -968,7 +968,7 @@ func TestCompaction_populateBlock(t *testing.T) {
blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt})
}

c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, LeveledCompactorOptions{})
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil)
require.NoError(t, err)

meta := &BlockMeta{
Expand Down Expand Up @@ -1101,7 +1101,7 @@ func BenchmarkCompaction(b *testing.B) {
blockDirs = append(blockDirs, block.Dir())
}

c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{})
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
require.NoError(b, err)

b.ResetTimer()
Expand Down Expand Up @@ -1481,7 +1481,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
// Compaction.
mint := head.MinTime()
maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{})
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
require.NoError(t, err)
id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1623,7 +1623,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
// Sparse head compaction.
mint := sparseHead.MinTime()
maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{})
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
require.NoError(t, err)
sparseULID, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1674,7 +1674,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
// Old head compaction.
mint := oldHead.MinTime()
maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{})
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
require.NoError(t, err)
oldULID, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil)
require.NoError(t, err)
Expand Down
7 changes: 2 additions & 5 deletions tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
nil,
db.logger,
ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5),
chunkenc.NewPool(),
LeveledCompactorOptions{},
chunkenc.NewPool(), nil,
)
if err != nil {
return errors.Wrap(err, "create leveled compactor")
Expand Down Expand Up @@ -820,9 +819,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
}

ctx, cancel := context.WithCancel(context.Background())
db.compactor, err = NewLeveledCompactor(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize,
})
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil)
if err != nil {
cancel()
return nil, errors.Wrap(err, "create leveled compactor")
Expand Down
36 changes: 18 additions & 18 deletions tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,7 @@ type symbolCacheEntry struct {
lastValue string
}

type PostingsEncoder interface {
EncodePostings(*encoding.Encbuf, []uint32)
}
type PostingsEncoder func(*encoding.Encbuf, []uint32) error

// Writer implements the IndexWriter interface for the standard
// serialization format.
Expand Down Expand Up @@ -192,11 +190,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
}

// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
func NewWriter(ctx context.Context, fn string, postingsEncoder PostingsEncoder) (*Writer, error) {
if postingsEncoder == nil {
postingsEncoder = &RawPostingsEncoder{}
}

// It uses the given encoder to encode each postings list.
func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder) (*Writer, error) {
dir := filepath.Dir(fn)

df, err := fileutil.OpenDir(dir)
Expand Down Expand Up @@ -242,14 +237,20 @@ func NewWriter(ctx context.Context, fn string, postingsEncoder PostingsEncoder)
symbolCache: make(map[string]symbolCacheEntry, 1<<8),
labelNames: make(map[string]uint64, 1<<8),
crc32: newCRC32(),
postingsEncoder: postingsEncoder,
postingsEncoder: encoder,
}
if err := iw.writeMeta(); err != nil {
return nil, err
}
return iw, nil
}

// NewWriter creates a new index writer using the default encoder. See
// NewWriterWithEncoder.
func NewWriter(ctx context.Context, fn string) (*Writer, error) {
return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw)
}

func (w *Writer) write(bufs ...[]byte) error {
return w.f.Write(bufs...)
}
Expand Down Expand Up @@ -952,16 +953,18 @@ func (w *Writer) writePostingsToTmpFiles() error {
return nil
}

// RawPostingsEncoder is the "basic" postings list encoding format with no compression:
// EncodePostingsRaw uses the "basic" postings list encoding format with no compression:
// <BE uint32 len X><BE uint32 0><BE uint32 1>...<BE uint32 X-1>.
type RawPostingsEncoder struct{}

func (r *RawPostingsEncoder) EncodePostings(e *encoding.Encbuf, offs []uint32) {
func EncodePostingsRaw(e *encoding.Encbuf, offs []uint32) error {
e.PutBE32int(len(offs))

for _, off := range offs {
if off > (1<<32)-1 {
return fmt.Errorf("series offset %d exceeds 4 bytes", off)
}
e.PutBE32(off)
}
return nil
}

func (w *Writer) writePosting(name, value string, offs []uint32) error {
Expand All @@ -982,12 +985,9 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
w.cntPO++

w.buf1.Reset()
for _, off := range offs {
if off > (1<<32)-1 {
return fmt.Errorf("series offset %d exceeds 4 bytes", off)
}
if err := w.postingsEncoder(&w.buf1, offs); err != nil {
return err
}
w.postingsEncoder.EncodePostings(&w.buf1, offs)

w.buf2.Reset()
l := w.buf1.Len()
Expand Down
10 changes: 5 additions & 5 deletions tsdb/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
fn := filepath.Join(dir, indexFilename)

// An empty index must still result in a readable file.
iw, err := NewWriter(context.Background(), fn, nil)
iw, err := NewWriter(context.Background(), fn)
require.NoError(t, err)
require.NoError(t, iw.Close())

Expand All @@ -166,7 +166,7 @@ func TestIndexRW_Postings(t *testing.T) {

fn := filepath.Join(dir, indexFilename)

iw, err := NewWriter(context.Background(), fn, nil)
iw, err := NewWriter(context.Background(), fn)
require.NoError(t, err)

series := []labels.Labels{
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestPostingsMany(t *testing.T) {

fn := filepath.Join(dir, indexFilename)

iw, err := NewWriter(context.Background(), fn, nil)
iw, err := NewWriter(context.Background(), fn)
require.NoError(t, err)

// Create a label in the index which has 999 values.
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestPersistence_index_e2e(t *testing.T) {
})
}

iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename), nil)
iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename))
require.NoError(t, err)

syms := []string{}
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestPersistence_index_e2e(t *testing.T) {
}

func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) {
w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"), nil)
w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"))
require.NoError(t, err)

require.NoError(t, w.AddSymbol("__name__"))
Expand Down

0 comments on commit d40910a

Please sign in to comment.