diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 1c7a5d56862..46e6ecf8441 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -312,7 +312,7 @@ func TestBlockSize(t *testing.T) { require.NoError(t, err) require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size") - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{}) + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) require.NoError(t, err) blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) require.NoError(t, err) @@ -489,7 +489,7 @@ func createBlock(tb testing.TB, dir string, series []storage.Series) string { } func createBlockFromHead(tb testing.TB, dir string, head *Head) string { - compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{}) + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil) require.NoError(tb, err) require.NoError(tb, os.MkdirAll(dir, 0o777)) @@ -502,7 +502,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { } func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string { - compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{}) + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil) require.NoError(tb, err) require.NoError(tb, os.MkdirAll(dir, 0o777)) diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index 6a67b34ddb7..0d017e095f4 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -100,7 +100,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { nil, w.logger, []int64{w.blockSize}, - chunkenc.NewPool(), LeveledCompactorOptions{}) + chunkenc.NewPool(), nil) if err != nil { return ulid.ULID{}, errors.Wrap(err, "create leveled compactor") } diff --git a/tsdb/compact.go b/tsdb/compact.go index 5c73f70a146..b8bd9881cc9 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -147,7 +147,7 @@ func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics { type LeveledCompactorOptions struct { // PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction. - // If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.RawPostingsEncoder for more. + // If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.EncodePostingsRaw for more. PE index.PostingsEncoder // MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used. MaxBlockChunkSegmentSize int64 @@ -155,7 +155,20 @@ type LeveledCompactorOptions struct { MergeFunc storage.VerticalChunkSeriesMergeFunc } -func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts LeveledCompactorOptions) (*LeveledCompactor, error) { +func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { + return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ + MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize, + MergeFunc: mergeFunc, + }) +} + +func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { + return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ + MergeFunc: mergeFunc, + }) +} + +func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts LeveledCompactorOptions) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, fmt.Errorf("at least one range must be provided") } @@ -175,7 +188,7 @@ func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Log } var pe index.PostingsEncoder if opts.PE == nil { - pe = &index.RawPostingsEncoder{} + pe = index.EncodePostingsRaw } return &LeveledCompactor{ ranges: ranges, @@ -615,7 +628,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl } } - indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder) + indexw, err := index.NewWriterWithEncoder(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder) if err != nil { return errors.Wrap(err, "open index writer") } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 55b23bf99db..94b35e3b4c2 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -164,7 +164,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) { }, } - c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, LeveledCompactorOptions{}) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil) require.NoError(t, err) c.plan(metas) @@ -178,7 +178,7 @@ func TestLeveledCompactor_plan(t *testing.T) { 180, 540, 1620, - }, nil, LeveledCompactorOptions{}) + }, nil, nil) require.NoError(t, err) cases := map[string]struct { @@ -387,7 +387,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { 240, 720, 2160, - }, nil, LeveledCompactorOptions{}) + }, nil, nil) require.NoError(t, err) cases := []struct { @@ -437,7 +437,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) { 240, 720, 2160, - }, nil, LeveledCompactorOptions{}) + }, nil, nil) require.NoError(t, err) tmpdir := t.TempDir() @@ -968,7 +968,7 @@ func TestCompaction_populateBlock(t *testing.T) { blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt}) } - c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, LeveledCompactorOptions{}) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil) require.NoError(t, err) meta := &BlockMeta{ @@ -1101,7 +1101,7 @@ func BenchmarkCompaction(b *testing.B) { blockDirs = append(blockDirs, block.Dir()) } - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{}) + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) require.NoError(b, err) b.ResetTimer() @@ -1481,7 +1481,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) { // Compaction. mint := head.MinTime() maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{}) + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) require.NoError(t, err) id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil) require.NoError(t, err) @@ -1623,7 +1623,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { // Sparse head compaction. mint := sparseHead.MinTime() maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{}) + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) require.NoError(t, err) sparseULID, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil) require.NoError(t, err) @@ -1674,7 +1674,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { // Old head compaction. mint := oldHead.MinTime() maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{}) + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) require.NoError(t, err) oldULID, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil) require.NoError(t, err) diff --git a/tsdb/db.go b/tsdb/db.go index 8ca3132541a..6e9396187b6 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -450,8 +450,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { nil, db.logger, ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5), - chunkenc.NewPool(), - LeveledCompactorOptions{}, + chunkenc.NewPool(), nil, ) if err != nil { return errors.Wrap(err, "create leveled compactor") @@ -820,9 +819,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } ctx, cancel := context.WithCancel(context.Background()) - db.compactor, err = NewLeveledCompactor(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{ - MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize, - }) + db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil) if err != nil { cancel() return nil, errors.Wrap(err, "create leveled compactor") diff --git a/tsdb/index/index.go b/tsdb/index/index.go index eab291bcab2..d759e8bd6cb 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -110,9 +110,7 @@ type symbolCacheEntry struct { lastValue string } -type PostingsEncoder interface { - EncodePostings(*encoding.Encbuf, []uint32) -} +type PostingsEncoder func(*encoding.Encbuf, []uint32) error // Writer implements the IndexWriter interface for the standard // serialization format. @@ -192,11 +190,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { } // NewWriter returns a new Writer to the given filename. It serializes data in format version 2. -func NewWriter(ctx context.Context, fn string, postingsEncoder PostingsEncoder) (*Writer, error) { - if postingsEncoder == nil { - postingsEncoder = &RawPostingsEncoder{} - } - +// It uses the given encoder to encode each postings list. +func NewWriterWithEncoder(ctx context.Context, fn string, encoder PostingsEncoder) (*Writer, error) { dir := filepath.Dir(fn) df, err := fileutil.OpenDir(dir) @@ -242,7 +237,7 @@ func NewWriter(ctx context.Context, fn string, postingsEncoder PostingsEncoder) symbolCache: make(map[string]symbolCacheEntry, 1<<8), labelNames: make(map[string]uint64, 1<<8), crc32: newCRC32(), - postingsEncoder: postingsEncoder, + postingsEncoder: encoder, } if err := iw.writeMeta(); err != nil { return nil, err @@ -250,6 +245,12 @@ func NewWriter(ctx context.Context, fn string, postingsEncoder PostingsEncoder) return iw, nil } +// NewWriter creates a new index writer using the default encoder. See +// NewWriterWithEncoder. +func NewWriter(ctx context.Context, fn string) (*Writer, error) { + return NewWriterWithEncoder(ctx, fn, EncodePostingsRaw) +} + func (w *Writer) write(bufs ...[]byte) error { return w.f.Write(bufs...) } @@ -952,16 +953,18 @@ func (w *Writer) writePostingsToTmpFiles() error { return nil } -// RawPostingsEncoder is the "basic" postings list encoding format with no compression: +// EncodePostingsRaw uses the "basic" postings list encoding format with no compression: // .... -type RawPostingsEncoder struct{} - -func (r *RawPostingsEncoder) EncodePostings(e *encoding.Encbuf, offs []uint32) { +func EncodePostingsRaw(e *encoding.Encbuf, offs []uint32) error { e.PutBE32int(len(offs)) for _, off := range offs { + if off > (1<<32)-1 { + return fmt.Errorf("series offset %d exceeds 4 bytes", off) + } e.PutBE32(off) } + return nil } func (w *Writer) writePosting(name, value string, offs []uint32) error { @@ -982,12 +985,9 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { w.cntPO++ w.buf1.Reset() - for _, off := range offs { - if off > (1<<32)-1 { - return fmt.Errorf("series offset %d exceeds 4 bytes", off) - } + if err := w.postingsEncoder(&w.buf1, offs); err != nil { + return err } - w.postingsEncoder.EncodePostings(&w.buf1, offs) w.buf2.Reset() l := w.buf1.Len() diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 056f8402ed4..6c5e313d434 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -141,7 +141,7 @@ func TestIndexRW_Create_Open(t *testing.T) { fn := filepath.Join(dir, indexFilename) // An empty index must still result in a readable file. - iw, err := NewWriter(context.Background(), fn, nil) + iw, err := NewWriter(context.Background(), fn) require.NoError(t, err) require.NoError(t, iw.Close()) @@ -166,7 +166,7 @@ func TestIndexRW_Postings(t *testing.T) { fn := filepath.Join(dir, indexFilename) - iw, err := NewWriter(context.Background(), fn, nil) + iw, err := NewWriter(context.Background(), fn) require.NoError(t, err) series := []labels.Labels{ @@ -250,7 +250,7 @@ func TestPostingsMany(t *testing.T) { fn := filepath.Join(dir, indexFilename) - iw, err := NewWriter(context.Background(), fn, nil) + iw, err := NewWriter(context.Background(), fn) require.NoError(t, err) // Create a label in the index which has 999 values. @@ -373,7 +373,7 @@ func TestPersistence_index_e2e(t *testing.T) { }) } - iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename), nil) + iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename)) require.NoError(t, err) syms := []string{} @@ -475,7 +475,7 @@ func TestPersistence_index_e2e(t *testing.T) { } func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) { - w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"), nil) + w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index")) require.NoError(t, err) require.NoError(t, w.AddSymbol("__name__"))