Skip to content

Commit

Permalink
Fix up tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kfaraz committed Jul 22, 2024
1 parent d1e0cd4 commit 6829c20
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class NewestSegmentFirstPolicyBenchmark
{
private static final String DATA_SOURCE_PREFIX = "dataSource_";

private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy();
private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(null);

@Param("100")
private int numDataSources;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
* - [x] handle success and failure inside CompactionStatusTracker
* - [x] make policy serializable
* - [ ] handle priority datasource in policy
* - [ ] add another policy
* - [ ] add another policy - newestSegmentFirst, smallestSegmentFirst, auto
* - [x] enable segments polling if overlord is standalone
* - [ ] test on cluster - standalone, coordinator-overlord
* - [ ] unit tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class CoordinatorCompactionConfig
private static final int DEFAULT_MAX_COMPACTION_TASK_SLOTS = Integer.MAX_VALUE;
private static final boolean DEFAULT_USE_AUTO_SCALE_SLOTS = false;
private static final CompactionEngine DEFAULT_COMPACTION_ENGINE = CompactionEngine.NATIVE;
private static final CompactionSegmentSearchPolicy DEFAULT_COMPACTION_POLICY = new NewestSegmentFirstPolicy();
private static final CompactionSegmentSearchPolicy DEFAULT_COMPACTION_POLICY = new NewestSegmentFirstPolicy(null);

private final List<DataSourceCompactionConfig> compactionConfigs;
private final double compactionTaskSlotRatio;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@

package org.apache.druid.server.coordinator.compact;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public abstract class BaseSegmentSearchPolicy implements CompactionSegmentSearchPolicy
Expand All @@ -34,11 +42,32 @@ protected BaseSegmentSearchPolicy(
}

@Nullable
public String getPriorityDatasource()
@JsonProperty
public final String getPriorityDatasource()
{
return priorityDatasource;
}

@Override
public CompactionSegmentIterator createIterator(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> dataSources,
Map<String, List<Interval>> skipIntervals,
CompactionStatusTracker statusTracker
)
{
return new PriorityBasedCompactionSegmentIterator(
compactionConfigs,
dataSources,
skipIntervals,
getPriorityDatasource(),
getSegmentComparator(),
statusTracker
);
}

protected abstract Comparator<SegmentsToCompact> getSegmentComparator();

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "newestSegmentFirst", value = NewestSegmentFirstPolicy.class)
@JsonSubTypes.Type(name = "newestSegmentFirst", value = NewestSegmentFirstPolicy.class),
@JsonSubTypes.Type(name = "smallestSegmentFirst", value = SmallestSegmentFirstPolicy.class)
})
public interface CompactionSegmentSearchPolicy
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ private enum State
);

private final State state;
private final String reasonToCompact;
private final String reason;

private CompactionStatus(State state, String reason)
{
this.state = state;
this.reasonToCompact = reason;
this.reason = reason;
}

public boolean isComplete()
Expand All @@ -92,9 +92,9 @@ public boolean isSkipped()
return state == State.SKIPPED;
}

public String getReasonToCompact()
public String getReason()
{
return reasonToCompact;
return reason;
}

private static CompactionStatus incomplete(String reasonFormat, Object... args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,12 +314,20 @@ private void findAndEnqueueSegmentsToCompact(CompactibleSegmentIterator compacti
skippedSegmentStats.increment(candidates.getStats());
log.warn(
"Skipping compaction for datasource[%s], interval[%s] due to reason[%s].",
dataSource, interval, compactionStatus.getReasonToCompact()
dataSource, interval, compactionStatus.getReason()
);
} else if (config.getGranularitySpec() != null
&& config.getGranularitySpec().getSegmentGranularity() != null) {
if (compactedIntervals.contains(interval)) {
// Skip this interval
} else {
compactedIntervals.add(interval);
queue.add(candidates);
}
} else {
log.info(
log.debug(
"Datasource[%s], interval[%s] has [%d] segments that need to be compacted because [%s].",
dataSource, interval, candidates.size(), compactionStatus.getReasonToCompact()
dataSource, interval, candidates.size(), compactionStatus.getReason()
);
queue.add(candidates);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,30 @@
package org.apache.druid.server.coordinator.compact;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.curator.shaded.com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.SegmentTimeline;
import org.joda.time.Interval;

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import java.util.Comparator;

/**
* This policy searches segments for compaction from newest to oldest.
*/
public class NewestSegmentFirstPolicy extends BaseSegmentSearchPolicy
{
@JsonCreator
public NewestSegmentFirstPolicy()
public NewestSegmentFirstPolicy(
@JsonProperty("priorityDatasource") @Nullable String priorityDatasource
)
{
super(null);
super(priorityDatasource);
}

@Override
public CompactionSegmentIterator createIterator(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> dataSources,
Map<String, List<Interval>> skipIntervals,
CompactionStatusTracker statusTracker
)
protected Comparator<SegmentsToCompact> getSegmentComparator()
{
return new PriorityBasedCompactionSegmentIterator(
compactionConfigs,
dataSources,
skipIntervals,
(o1, o2) -> Comparators.intervalsByStartThenEnd()
.compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval()),
statusTracker
);
return (o1, o2) -> Comparators.intervalsByStartThenEnd()
.compare(o2.getUmbrellaInterval(), o1.getUmbrellaInterval());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
package org.apache.druid.server.coordinator.compact;

import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand All @@ -49,11 +52,24 @@ public PriorityBasedCompactionSegmentIterator(
Map<String, DataSourceCompactionConfig> compactionConfigs,
Map<String, SegmentTimeline> datasourceToTimeline,
Map<String, List<Interval>> skipIntervals,
@Nullable String priorityDatasource,
Comparator<SegmentsToCompact> segmentPriority,
CompactionStatusTracker statusTracker
)
{
this.queue = new PriorityQueue<>(segmentPriority);
final Comparator<SegmentsToCompact> comparator;
if (priorityDatasource == null) {
comparator = segmentPriority;
} else {
comparator = Ordering.compound(
Arrays.asList(
Ordering.explicit(priorityDatasource).onResultOf(entry -> entry.getFirst().getDataSource()),
segmentPriority
)
);
}
this.queue = new PriorityQueue<>(comparator);

this.datasourceIterators = Maps.newHashMapWithExpectedSize(datasourceToTimeline.size());
compactionConfigs.forEach((datasource, config) -> {
if (config == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator.compact;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Ordering;

import javax.annotation.Nullable;
import java.util.Comparator;

/**
* This policy searches segments for compaction from smallest to largest.
*/
public class SmallestSegmentFirstPolicy extends BaseSegmentSearchPolicy
{
@JsonCreator
public SmallestSegmentFirstPolicy(
@JsonProperty("priorityDatasource") @Nullable String priorityDatasource
)
{
super(priorityDatasource);
}

@Override
protected Comparator<SegmentsToCompact> getSegmentComparator()
{
return Ordering.natural()
.onResultOf(entry -> entry.getTotalBytes() / entry.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
Expand All @@ -82,8 +83,14 @@ public class NewestSegmentFirstPolicyTest
private static final long DEFAULT_SEGMENT_SIZE = 1000;
private static final int DEFAULT_NUM_SEGMENTS_PER_SHARD = 4;
private final ObjectMapper mapper = new DefaultObjectMapper();
private final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy();
private final CompactionStatusTracker statusTracker = new CompactionStatusTracker(mapper);
private final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(null);
private CompactionStatusTracker statusTracker;

@Before
public void setup()
{
statusTracker = new CompactionStatusTracker(mapper);
}

@Test
public void testLargeOffsetAndSmallSegmentInterval()
Expand Down

0 comments on commit 6829c20

Please sign in to comment.