diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index b2cef37cef74..84bf5442fb0e 100644 --- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -62,7 +62,7 @@ public class NewestSegmentFirstPolicyBenchmark { private static final String DATA_SOURCE_PREFIX = "dataSource_"; - private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(); + private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(null); @Param("100") private int numDataSources; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSchedulerImpl.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSchedulerImpl.java index 21683bf6daa3..02af6727f64a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSchedulerImpl.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSchedulerImpl.java @@ -81,7 +81,7 @@ * - [x] handle success and failure inside CompactionStatusTracker * - [x] make policy serializable * - [ ] handle priority datasource in policy - * - [ ] add another policy + * - [ ] add another policy - newestSegmentFirst, smallestSegmentFirst, auto * - [x] enable segments polling if overlord is standalone * - [ ] test on cluster - standalone, coordinator-overlord * - [ ] unit tests diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java index de1af4e4b9b0..47efe462bc1b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java @@ -40,7 +40,7 @@ public class CoordinatorCompactionConfig private static final int DEFAULT_MAX_COMPACTION_TASK_SLOTS = Integer.MAX_VALUE; private static final boolean DEFAULT_USE_AUTO_SCALE_SLOTS = false; private static final CompactionEngine DEFAULT_COMPACTION_ENGINE = CompactionEngine.NATIVE; - private static final CompactionSegmentSearchPolicy DEFAULT_COMPACTION_POLICY = new NewestSegmentFirstPolicy(); + private static final CompactionSegmentSearchPolicy DEFAULT_COMPACTION_POLICY = new NewestSegmentFirstPolicy(null); private final List compactionConfigs; private final double compactionTaskSlotRatio; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/BaseSegmentSearchPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/BaseSegmentSearchPolicy.java index 7342de11c3df..0e74f4299c7e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/BaseSegmentSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/BaseSegmentSearchPolicy.java @@ -19,7 +19,15 @@ package org.apache.druid.server.coordinator.compact; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.timeline.SegmentTimeline; +import org.joda.time.Interval; + import javax.annotation.Nullable; +import java.util.Comparator; +import java.util.List; +import java.util.Map; import java.util.Objects; public abstract class BaseSegmentSearchPolicy implements CompactionSegmentSearchPolicy @@ -34,11 +42,32 @@ protected BaseSegmentSearchPolicy( } @Nullable - public String getPriorityDatasource() + @JsonProperty + public final String getPriorityDatasource() { return priorityDatasource; } + @Override + public CompactionSegmentIterator createIterator( + Map compactionConfigs, + Map dataSources, + Map> skipIntervals, + CompactionStatusTracker statusTracker + ) + { + return new PriorityBasedCompactionSegmentIterator( + compactionConfigs, + dataSources, + skipIntervals, + getPriorityDatasource(), + getSegmentComparator(), + statusTracker + ); + } + + protected abstract Comparator getSegmentComparator(); + @Override public boolean equals(Object o) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java index d18955866557..5d71f8f6edd1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionSegmentSearchPolicy.java @@ -34,7 +34,8 @@ */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "newestSegmentFirst", value = NewestSegmentFirstPolicy.class) + @JsonSubTypes.Type(name = "newestSegmentFirst", value = NewestSegmentFirstPolicy.class), + @JsonSubTypes.Type(name = "smallestSegmentFirst", value = SmallestSegmentFirstPolicy.class) }) public interface CompactionSegmentSearchPolicy { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java index 410e2ee4841e..a63aef39315d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/CompactionStatus.java @@ -74,12 +74,12 @@ private enum State ); private final State state; - private final String reasonToCompact; + private final String reason; private CompactionStatus(State state, String reason) { this.state = state; - this.reasonToCompact = reason; + this.reason = reason; } public boolean isComplete() @@ -92,9 +92,9 @@ public boolean isSkipped() return state == State.SKIPPED; } - public String getReasonToCompact() + public String getReason() { - return reasonToCompact; + return reason; } private static CompactionStatus incomplete(String reasonFormat, Object... args) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIterator.java index 5e2d666a8f08..f38d6e1e6d65 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/DataSourceCompactibleSegmentIterator.java @@ -314,12 +314,20 @@ private void findAndEnqueueSegmentsToCompact(CompactibleSegmentIterator compacti skippedSegmentStats.increment(candidates.getStats()); log.warn( "Skipping compaction for datasource[%s], interval[%s] due to reason[%s].", - dataSource, interval, compactionStatus.getReasonToCompact() + dataSource, interval, compactionStatus.getReason() ); + } else if (config.getGranularitySpec() != null + && config.getGranularitySpec().getSegmentGranularity() != null) { + if (compactedIntervals.contains(interval)) { + // Skip this interval + } else { + compactedIntervals.add(interval); + queue.add(candidates); + } } else { - log.info( + log.debug( "Datasource[%s], interval[%s] has [%d] segments that need to be compacted because [%s].", - dataSource, interval, candidates.size(), compactionStatus.getReasonToCompact() + dataSource, interval, candidates.size(), compactionStatus.getReason() ); queue.add(candidates); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java index f7f0ebbff620..55ca55eafc42 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicy.java @@ -20,13 +20,12 @@ package org.apache.druid.server.coordinator.compact; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.curator.shaded.com.google.common.collect.Ordering; import org.apache.druid.java.util.common.guava.Comparators; -import org.apache.druid.server.coordinator.DataSourceCompactionConfig; -import org.apache.druid.timeline.SegmentTimeline; -import org.joda.time.Interval; -import java.util.List; -import java.util.Map; +import javax.annotation.Nullable; +import java.util.Comparator; /** * This policy searches segments for compaction from newest to oldest. @@ -34,26 +33,17 @@ public class NewestSegmentFirstPolicy extends BaseSegmentSearchPolicy { @JsonCreator - public NewestSegmentFirstPolicy() + public NewestSegmentFirstPolicy( + @JsonProperty("priorityDatasource") @Nullable String priorityDatasource + ) { - super(null); + super(priorityDatasource); } @Override - public CompactionSegmentIterator createIterator( - Map compactionConfigs, - Map dataSources, - Map> skipIntervals, - CompactionStatusTracker statusTracker - ) + protected Comparator getSegmentComparator() { - return new PriorityBasedCompactionSegmentIterator( - compactionConfigs, - dataSources, - skipIntervals, - (o1, o2) -> Comparators.intervalsByStartThenEnd() - .compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval()), - statusTracker - ); + return (o1, o2) -> Comparators.intervalsByStartThenEnd() + .compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval()); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/PriorityBasedCompactionSegmentIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/PriorityBasedCompactionSegmentIterator.java index a3074a097a1d..bb0006aef2d0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/compact/PriorityBasedCompactionSegmentIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/PriorityBasedCompactionSegmentIterator.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator.compact; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -27,6 +28,8 @@ import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; +import javax.annotation.Nullable; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -49,11 +52,24 @@ public PriorityBasedCompactionSegmentIterator( Map compactionConfigs, Map datasourceToTimeline, Map> skipIntervals, + @Nullable String priorityDatasource, Comparator segmentPriority, CompactionStatusTracker statusTracker ) { - this.queue = new PriorityQueue<>(segmentPriority); + final Comparator comparator; + if (priorityDatasource == null) { + comparator = segmentPriority; + } else { + comparator = Ordering.compound( + Arrays.asList( + Ordering.explicit(priorityDatasource).onResultOf(entry -> entry.getFirst().getDataSource()), + segmentPriority + ) + ); + } + this.queue = new PriorityQueue<>(comparator); + this.datasourceIterators = Maps.newHashMapWithExpectedSize(datasourceToTimeline.size()); compactionConfigs.forEach((datasource, config) -> { if (config == null) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/compact/SmallestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/compact/SmallestSegmentFirstPolicy.java new file mode 100644 index 000000000000..1ab0f7d8e698 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/compact/SmallestSegmentFirstPolicy.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.compact; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Ordering; + +import javax.annotation.Nullable; +import java.util.Comparator; + +/** + * This policy searches segments for compaction from smallest to largest. + */ +public class SmallestSegmentFirstPolicy extends BaseSegmentSearchPolicy +{ + @JsonCreator + public SmallestSegmentFirstPolicy( + @JsonProperty("priorityDatasource") @Nullable String priorityDatasource + ) + { + super(priorityDatasource); + } + + @Override + protected Comparator getSegmentComparator() + { + return Ordering.natural() + .onResultOf(entry -> entry.getTotalBytes() / entry.size()); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java index 7adecff8e850..5d5d10cec67d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/compact/NewestSegmentFirstPolicyTest.java @@ -63,6 +63,7 @@ import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.util.ArrayList; @@ -82,8 +83,14 @@ public class NewestSegmentFirstPolicyTest private static final long DEFAULT_SEGMENT_SIZE = 1000; private static final int DEFAULT_NUM_SEGMENTS_PER_SHARD = 4; private final ObjectMapper mapper = new DefaultObjectMapper(); - private final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(); - private final CompactionStatusTracker statusTracker = new CompactionStatusTracker(mapper); + private final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(null); + private CompactionStatusTracker statusTracker; + + @Before + public void setup() + { + statusTracker = new CompactionStatusTracker(mapper); + } @Test public void testLargeOffsetAndSmallSegmentInterval()