diff --git a/tsdb/compact.go b/tsdb/compact.go index c5bd2ed2a67..b4c194de97f 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -75,14 +75,15 @@ type Compactor interface { // LeveledCompactor implements the Compactor interface. type LeveledCompactor struct { - metrics *CompactorMetrics - logger log.Logger - ranges []int64 - chunkPool chunkenc.Pool - ctx context.Context - maxBlockChunkSegmentSize int64 - mergeFunc storage.VerticalChunkSeriesMergeFunc - postingsEncoder index.PostingsEncoder + metrics *CompactorMetrics + logger log.Logger + ranges []int64 + chunkPool chunkenc.Pool + ctx context.Context + maxBlockChunkSegmentSize int64 + mergeFunc storage.VerticalChunkSeriesMergeFunc + postingsEncoder index.PostingsEncoder + enableOverlappingCompaction bool } type CompactorMetrics struct { @@ -153,18 +154,23 @@ type LeveledCompactorOptions struct { MaxBlockChunkSegmentSize int64 // MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used. MergeFunc storage.VerticalChunkSeriesMergeFunc + // EnableOverlappingCompaction enables compaction of overlapping blocks. In Prometheus it is always enabled. + // It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction. + EnableOverlappingCompaction bool } func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ - MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize, - MergeFunc: mergeFunc, + MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize, + MergeFunc: mergeFunc, + EnableOverlappingCompaction: true, }) } func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{ - MergeFunc: mergeFunc, + MergeFunc: mergeFunc, + EnableOverlappingCompaction: true, }) } @@ -191,14 +197,15 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer pe = index.EncodePostingsRaw } return &LeveledCompactor{ - ranges: ranges, - chunkPool: pool, - logger: l, - metrics: newCompactorMetrics(r), - ctx: ctx, - maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, - mergeFunc: mergeFunc, - postingsEncoder: pe, + ranges: ranges, + chunkPool: pool, + logger: l, + metrics: newCompactorMetrics(r), + ctx: ctx, + maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, + mergeFunc: mergeFunc, + postingsEncoder: pe, + enableOverlappingCompaction: opts.EnableOverlappingCompaction, }, nil } @@ -317,6 +324,9 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { // selectOverlappingDirs returns all dirs with overlapping time ranges. // It expects sorted input by mint and returns the overlapping dirs in the same order as received. func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string { + if !c.enableOverlappingCompaction { + return nil + } if len(ds) < 2 { return nil }