Skip to content

Commit

Permalink
stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
kfaraz committed Jul 16, 2024
1 parent 615f61d commit 37dedf8
Showing 1 changed file with 7 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private void populateQueue(SegmentTimeline timeline, List<Interval> skipInterval
}
}

updateQueue(dataSource);
findSegmentsToCompact(dataSource);
}

public CompactionStatistics totalCompactedStatistics()
Expand Down Expand Up @@ -202,21 +202,9 @@ public SegmentsToCompact next()
final List<DataSegment> resultSegments = entry.getSegments();
Preconditions.checkState(!resultSegments.isEmpty(), "Queue entry must not be empty");

updateQueue(dataSource);
return entry;
}

/**
* Find the next segments to compact for the given dataSource and add them to the queue.
*/
private void updateQueue(String dataSourceName)
{
final SegmentsToCompact segmentsToCompact = findSegmentsToCompact(dataSourceName, config);
if (!segmentsToCompact.isEmpty()) {
queue.add(segmentsToCompact);
}
}

/**
* Iterates compactible segments in a {@link SegmentTimeline}.
*/
Expand Down Expand Up @@ -307,22 +295,18 @@ public List<DataSegment> next()
}

/**
* Finds segments to compact together for the given datasource.
*
* @return An empty {@link SegmentsToCompact} if there are no eligible candidates.
* Finds segments to compact together for the given datasource and adds them to
* the priority queue.
*/
private SegmentsToCompact findSegmentsToCompact(
final String dataSourceName,
final DataSourceCompactionConfig config
)
private void findSegmentsToCompact(String dataSourceName)
{
final CompactibleSegmentIterator compactibleSegmentIterator = timelineIterator.get();
if (compactibleSegmentIterator == null) {
log.warn(
"Skipping compaction for datasource[%s] as there is no compactible segment in its timeline.",
dataSourceName
);
return SegmentsToCompact.empty();
return;
}

final long inputSegmentSize = config.getInputSegmentSizeBytes();
Expand Down Expand Up @@ -362,15 +346,14 @@ private SegmentsToCompact findSegmentsToCompact(
// Skip these candidate segments as we have already compacted this interval
} else {
compactedIntervals.add(interval);
return candidates;
queue.add(candidates);
}
} else {
return candidates;
queue.add(candidates);
}
}

log.debug("No more segments to compact for datasource[%s].", dataSourceName);
return SegmentsToCompact.empty();
}

/**
Expand Down

0 comments on commit 37dedf8

Please sign in to comment.