Skip to content

Commit

Permalink
tsdb: add enable overlapping compaction
Browse files Browse the repository at this point in the history
This functionality is needed in downstream projects because they have a
separate component that does compaction.

Upstreaming
https://github.com/grafana/mimir-prometheus/blob/7c8e9a2a76fc729e9078889782928b2fdfe240e9/tsdb/compact.go#L323-L325.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Jan 12, 2024
1 parent 6150e1c commit 3a48adc
Showing 1 changed file with 29 additions and 19 deletions.
48 changes: 29 additions & 19 deletions tsdb/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ type Compactor interface {

// LeveledCompactor implements the Compactor interface.
type LeveledCompactor struct {
metrics *CompactorMetrics
logger log.Logger
ranges []int64
chunkPool chunkenc.Pool
ctx context.Context
maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc
postingsEncoder index.PostingsEncoder
metrics *CompactorMetrics
logger log.Logger
ranges []int64
chunkPool chunkenc.Pool
ctx context.Context
maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc
postingsEncoder index.PostingsEncoder
enableOverlappingCompaction bool
}

type CompactorMetrics struct {
Expand Down Expand Up @@ -153,18 +154,23 @@ type LeveledCompactorOptions struct {
MaxBlockChunkSegmentSize int64
// MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used.
MergeFunc storage.VerticalChunkSeriesMergeFunc
// EnableOverlappingCompaction enables compaction of overlapping blocks. In Prometheus it is always enabled.
// It is useful for downstream projects like Mimir, Cortex, Thanos where they have a separate component that does compaction.
EnableOverlappingCompaction bool
}

func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
MergeFunc: mergeFunc,
MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
MergeFunc: mergeFunc,
EnableOverlappingCompaction: true,
})
}

func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
MergeFunc: mergeFunc,
MergeFunc: mergeFunc,
EnableOverlappingCompaction: true,
})
}

Expand All @@ -191,14 +197,15 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer
pe = index.EncodePostingsRaw
}
return &LeveledCompactor{
ranges: ranges,
chunkPool: pool,
logger: l,
metrics: newCompactorMetrics(r),
ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc,
postingsEncoder: pe,
ranges: ranges,
chunkPool: pool,
logger: l,
metrics: newCompactorMetrics(r),
ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc,
postingsEncoder: pe,
enableOverlappingCompaction: opts.EnableOverlappingCompaction,
}, nil
}

Expand Down Expand Up @@ -317,6 +324,9 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
// selectOverlappingDirs returns all dirs with overlapping time ranges.
// It expects sorted input by mint and returns the overlapping dirs in the same order as received.
func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string {
if !c.enableOverlappingCompaction {
return nil
}
if len(ds) < 2 {
return nil
}
Expand Down

0 comments on commit 3a48adc

Please sign in to comment.