Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] add TTL to rollup-jobs #369

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions app/com/arpnetworking/rollups/RollupDefinition.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public final class RollupDefinition implements Serializable, ConsistentHashingRo
private final Instant _startTime;
private final ImmutableMap<String, String> _filterTags;
private final ImmutableMultimap<String, String> _allMetricTags;
private final Instant _giveUpAfter;

private RollupDefinition(final Builder builder) {
_sourceMetricName = builder._sourceMetricName;
Expand All @@ -48,6 +49,7 @@ private RollupDefinition(final Builder builder) {
_startTime = builder._startTime;
_filterTags = builder._filterTags;
_allMetricTags = builder._allMetricTags;
_giveUpAfter = builder._giveUpAfter;
}

public String getSourceMetricName() {
Expand All @@ -74,6 +76,10 @@ public ImmutableMultimap<String, String> getAllMetricTags() {
return _allMetricTags;
}

public Instant getGiveUpAfter() {
return _giveUpAfter;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -88,12 +94,13 @@ public boolean equals(final Object o) {
&& _period == that._period
&& _startTime.equals(that._startTime)
&& _filterTags.equals(that._filterTags)
&& _allMetricTags.equals(that._allMetricTags);
&& _allMetricTags.equals(that._allMetricTags)
&& _giveUpAfter.equals(that._giveUpAfter);
}

@Override
public int hashCode() {
return Objects.hash(_sourceMetricName, _destinationMetricName, _period, _startTime, _filterTags, _allMetricTags);
return Objects.hash(_sourceMetricName, _destinationMetricName, _period, _startTime, _filterTags, _allMetricTags, _giveUpAfter);
}

@Override
Expand All @@ -105,6 +112,7 @@ public String toString() {
.add("_startTime", _startTime)
.add("_filterTags", _filterTags)
.add("_allMetricTags", _allMetricTags)
.add("_giveUpAfter", _giveUpAfter)
.toString();
}

Expand Down Expand Up @@ -135,6 +143,8 @@ public static final class Builder extends OvalBuilder<RollupDefinition> {
private ImmutableMap<String, String> _filterTags = ImmutableMap.of();
@NotNull
private ImmutableMultimap<String, String> _allMetricTags;
@NotNull
private Instant _giveUpAfter;

/**
* Creates a builder for a RollupDefinition.
Expand Down Expand Up @@ -208,5 +218,16 @@ public Builder setAllMetricTags(final ImmutableMultimap<String, String> value) {
_allMetricTags = value;
return this;
}

/**
* Sets the {@code _giveUpAfter} and returns a reference to this Builder so that the methods can be chained together.
*
* @param value the {@code _giveUpAfter} to set
* @return a reference to this Builder
*/
public Builder setGiveUpAfter(final Instant value) {
_giveUpAfter = value;
return this;
}
}
}
3 changes: 2 additions & 1 deletion app/com/arpnetworking/rollups/RollupGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ private void handleLastDataPointMessage(final LastDataPointsMessage message) {
.setSourceMetricName(message.getSourceMetricName())
.setDestinationMetricName(rollupMetricName)
.setPeriod(period)
.setAllMetricTags(message.getTags());
.setAllMetricTags(message.getTags())
.setGiveUpAfter(Instant.now().plus(period.periodCountToDuration(1)));

for (final Instant startTime : startTimes) {
rollupDefBuilder.setStartTime(startTime);
Expand Down
25 changes: 24 additions & 1 deletion app/com/arpnetworking/rollups/RollupManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import scala.concurrent.duration.FiniteDuration;

import java.io.Serializable;
import java.time.Instant;
import java.util.Comparator;
import java.util.Optional;
import java.util.TreeSet;
Expand Down Expand Up @@ -140,7 +141,29 @@ private void executorFinished(final RollupExecutor.FinishRollupMessage message)
}

private Optional<RollupDefinition> getNextRollup() {
return Optional.ofNullable(_rollupDefinitions.pollFirst());
while (!_rollupDefinitions.isEmpty()) {
final RollupDefinition earliest = _rollupDefinitions.pollFirst();
if (earliest == null) {
LOGGER.error()
.setMessage("got null job out of set despite verifying non-emptiness; should be impossible")
.log();
return Optional.empty();
}
final Instant now = Instant.now();
if (earliest.getGiveUpAfter().isAfter(now)) {
return Optional.of(earliest);
}
LOGGER.warn()
.setMessage("rollup definition aged out")
.addData("rollupDefinition", earliest)
.addData("timeAgedOut", now)
.log();
final Metrics metrics = _metricsFactory.create();
metrics.addAnnotation("rollup_metric", earliest.getDestinationMetricName());
metrics.incrementCounter("rollup/manager/aged_out");
metrics.close();
}
return Optional.empty();
}

private static class RollupComparator implements Comparator<RollupDefinition>, Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ public static RollupDefinition.Builder createRollupDefinitionBuilder() {
.setDestinationMetricName("my_metric_1h")
.setPeriod(RollupPeriod.HOURLY)
.setAllMetricTags(ImmutableMultimap.of("tag", "val"))
.setFilterTags(ImmutableMap.of());
.setFilterTags(ImmutableMap.of())
.setGiveUpAfter(Instant.now().plus(Duration.ofDays(1)));
}

/**
Expand Down
16 changes: 5 additions & 11 deletions test/java/com/arpnetworking/rollups/RollupExecutorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.arpnetworking.kairos.client.models.Sampling;
import com.arpnetworking.kairos.client.models.SamplingUnit;
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
import com.arpnetworking.metrics.portal.TestBeanFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
Expand Down Expand Up @@ -126,14 +127,7 @@ public void testFetchesNextRollup() {
_probe.expectMsg(RollupExecutor.FETCH_ROLLUP);
final RollupExecutor.FinishRollupMessage finished = ThreadLocalBuilder.build(
RollupExecutor.FinishRollupMessage.Builder.class,
b -> b.setRollupDefinition(new RollupDefinition.Builder()
.setSourceMetricName("metric")
.setDestinationMetricName("metric_1h")
.setPeriod(RollupPeriod.HOURLY)
.setStartTime(Instant.EPOCH)
.setAllMetricTags(ImmutableMultimap.of())
.build()
)
b -> b.setRollupDefinition(TestBeanFactory.createRollupDefinitionBuilder().build())
);
actor.tell(finished, ActorRef.noSender());
_probe.expectMsg(finished);
Expand Down Expand Up @@ -172,7 +166,7 @@ public void testPerformsRollup() {
_probe.expectMsg(RollupExecutor.FETCH_ROLLUP);

actor.tell(
new RollupDefinition.Builder()
TestBeanFactory.createRollupDefinitionBuilder()
.setSourceMetricName("metric")
.setDestinationMetricName("metric_1h")
.setPeriod(RollupPeriod.HOURLY)
Expand Down Expand Up @@ -219,7 +213,7 @@ public void testPerformsRollup() {

@Test
public void testBuildRollupQuery() {
RollupDefinition definition = new RollupDefinition.Builder()
RollupDefinition definition = TestBeanFactory.createRollupDefinitionBuilder()
.setSourceMetricName("my_metric")
.setDestinationMetricName("my_metric_1h")
.setAllMetricTags(ImmutableMultimap.of("tag1", "val1", "tag2", "val2"))
Expand Down Expand Up @@ -256,7 +250,7 @@ public void testBuildRollupQuery() {
RollupExecutor.buildQueryRollup(definition)
);

definition = new RollupDefinition.Builder()
definition = TestBeanFactory.createRollupDefinitionBuilder()
.setSourceMetricName("my_metric_1h")
.setDestinationMetricName("my_metric_1d")
.setAllMetricTags(ImmutableMultimap.of("tag1", "val1", "tag2", "val2"))
Expand Down
40 changes: 21 additions & 19 deletions test/java/com/arpnetworking/rollups/RollupManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.arpnetworking.metrics.impl.NoOpMetricsFactory;
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
import com.arpnetworking.metrics.portal.AkkaClusteringConfigFactory;
import com.arpnetworking.metrics.portal.TestBeanFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -102,7 +103,7 @@ public void testStoreRollupDefinition() {
final TestKit testKit = new TestKit(_system);
final ActorRef actor = createActor();
final ActorRef testActor = testKit.getTestActor();
final RollupDefinition.Builder rollupDefBuilder = new RollupDefinition.Builder()
final RollupDefinition.Builder rollupDefBuilder = TestBeanFactory.createRollupDefinitionBuilder()
.setSourceMetricName("foo")
.setDestinationMetricName("foo_1h")
.setPeriod(RollupPeriod.HOURLY)
Expand All @@ -128,12 +129,7 @@ public void testDeDupsRollups() {
final TestKit testKit = new TestKit(_system);
final ActorRef actor = createActor();
final ActorRef testActor = testKit.getTestActor();
final RollupDefinition.Builder rollupDefBuilder = new RollupDefinition.Builder()
.setSourceMetricName("foo")
.setDestinationMetricName("foo_1h")
.setPeriod(RollupPeriod.HOURLY)
.setAllMetricTags(ImmutableMultimap.of("bar", "val"))
.setStartTime(Instant.EPOCH);
final RollupDefinition.Builder rollupDefBuilder = TestBeanFactory.createRollupDefinitionBuilder();
final RollupDefinition rollupDef = rollupDefBuilder.build();
final RollupDefinition rollupDef2 = rollupDefBuilder.build();
actor.tell(rollupDef, testActor);
Expand All @@ -149,14 +145,9 @@ public void testReturnsRollupsInChronologicalOrder() {
final TestKit testKit = new TestKit(_system);
final ActorRef actor = createActor();
final ActorRef testActor = testKit.getTestActor();
final RollupDefinition.Builder rollupDefBuilder = new RollupDefinition.Builder()
.setSourceMetricName("foo")
.setDestinationMetricName("foo_1h")
.setPeriod(RollupPeriod.HOURLY)
.setAllMetricTags(ImmutableMultimap.of("bar", "val"))
.setStartTime(Instant.EPOCH);
final RollupDefinition.Builder rollupDefBuilder = TestBeanFactory.createRollupDefinitionBuilder();
final RollupDefinition rollupDef = rollupDefBuilder.build();
final RollupDefinition rollupDef2 = rollupDefBuilder.setStartTime(Instant.EPOCH.plus(1, ChronoUnit.HOURS)).build();
final RollupDefinition rollupDef2 = rollupDefBuilder.setStartTime(rollupDef.getStartTime().plus(1, ChronoUnit.HOURS)).build();
actor.tell(rollupDef2, testActor);
actor.tell(rollupDef, testActor);
actor.tell(RollupFetch.getInstance(), testActor);
Expand All @@ -174,12 +165,8 @@ public void testSplitsFailedRollups() throws Exception {
final TestKit testKit = new TestKit(_system);
final ActorRef actor = createActor();
final ActorRef testActor = testKit.getTestActor();
final RollupDefinition rollupDef = new RollupDefinition.Builder()
.setSourceMetricName("foo")
.setDestinationMetricName("foo_1h")
.setPeriod(RollupPeriod.HOURLY)
final RollupDefinition rollupDef = TestBeanFactory.createRollupDefinitionBuilder()
.setAllMetricTags(ImmutableMultimap.of("tag", "val1", "tag", "val2"))
.setStartTime(Instant.EPOCH)
.build();

final ImmutableSet<RollupDefinition> children = ImmutableSet.of("val1", "val2").stream()
Expand All @@ -204,4 +191,19 @@ public void testSplitsFailedRollups() throws Exception {
actor.tell(RollupFetch.getInstance(), testActor);
testKit.expectMsg(NoMoreRollups.getInstance());
}

@Test
public void testAgesOutExpiredRollups() {
final TestKit testKit = new TestKit(_system);
final ActorRef actor = createActor();
final ActorRef testActor = testKit.getTestActor();
final RollupDefinition rollupDef = TestBeanFactory.createRollupDefinitionBuilder()
.setGiveUpAfter(Instant.now().minusMillis(1))
.build();

actor.tell(rollupDef, testActor);

actor.tell(RollupFetch.getInstance(), testActor);
testKit.expectMsg(NoMoreRollups.getInstance());
}
}