Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodically create partitions for alert executions #398

Merged
merged 36 commits into from
Jun 20, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
abbbe67
WIP
cwbriones May 28, 2020
d2ce1d3
change function
cwbriones May 28, 2020
e2d8ea5
rewrite query so that it's more readable
cwbriones Jun 5, 2020
9275fa9
create and bind actor
cwbriones Jun 5, 2020
f118544
tests pass
cwbriones Jun 8, 2020
b1fc6db
propagate the exception from within the actor
cwbriones Jun 8, 2020
f0ab80a
missing Named annotation
cwbriones Jun 8, 2020
dd711da
Merge branch 'master' into create_partitions
cwbriones Jun 8, 2020
929aa31
rework guice bindings
cwbriones Jun 8, 2020
2b14dff
stop creating on 'open'
cwbriones Jun 8, 2020
2b097fe
moved impl into repository, use ddl user
cwbriones Jun 9, 2020
f015f2d
fix startup issue by specifying a ddl ebean datasource
cwbriones Jun 10, 2020
6f180c5
tests finally pass 😭
cwbriones Jun 10, 2020
fd09528
require migrations in ebean server provider
cwbriones Jun 11, 2020
85ebe05
bump the connection pool, add unit tests
cwbriones Jun 11, 2020
ac21546
suppress fb warning
cwbriones Jun 11, 2020
ed68e90
tweak pool sizes again
cwbriones Jun 11, 2020
e09c1d5
additional checks in tests
cwbriones Jun 11, 2020
e398c9e
revert pool size change
cwbriones Jun 11, 2020
cd9b68f
Revert "revert pool size change"
cwbriones Jun 12, 2020
5c837de
unused import
cwbriones Jun 12, 2020
ed117e2
Merge branch 'master' into create_partitions
cwbriones Jun 12, 2020
07e8921
Merge branch 'master' into create_partitions
cwbriones Jun 16, 2020
9ab2d01
Merge branch 'master' into create_partitions
cwbriones Jun 17, 2020
1919870
Merge branch 'master' into create_partitions
cwbriones Jun 17, 2020
7167b29
address comments
cwbriones Jun 18, 2020
2cb6e0b
use akka.actor.Status instead of mananging the future ourselves
cwbriones Jun 18, 2020
d6a5afb
feedback
cwbriones Jun 18, 2020
5ee82ea
test fixes
cwbriones Jun 18, 2020
395bc09
checkstyle
cwbriones Jun 18, 2020
a7ff3f6
method should be private
cwbriones Jun 18, 2020
4ca92f2
Merge branch 'master' into create_partitions
cwbriones Jun 19, 2020
b18deac
Merge branch 'master' into create_partitions
cwbriones Jun 19, 2020
466bc04
Merge branch 'master' into create_partitions
cwbriones Jun 19, 2020
aa6d0b4
Merge remote-tracking branch 'upstream/master' into create_partitions
cwbriones Jun 20, 2020
96be3c9
feedback part 2
cwbriones Jun 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import com.arpnetworking.commons.builder.OvalBuilder;
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
import com.arpnetworking.metrics.portal.scheduling.Schedule;
import com.arpnetworking.metrics.portal.scheduling.impl.PeriodicSchedule;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import com.google.common.collect.Sets;
import io.ebean.EbeanServer;
import io.ebean.SqlQuery;

Expand All @@ -37,7 +40,7 @@
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import javax.persistence.PersistenceException;

Expand All @@ -51,8 +54,6 @@ public class DailyPartitionCreator extends AbstractActorWithTimers {
private static final Logger LOGGER = LoggerFactory.getLogger(DailyPartitionCreator.class);
private static final Duration TICK_INTERVAL = Duration.ofMinutes(1);
private static final String TICKER_NAME = "PERIODIC_TICK";
private static final String START_TICKING = "MSG_START_TICKING";
private static final String EXECUTE = "MSG_EXECUTE";
private final EbeanServer _ebeanServer;
cwbriones marked this conversation as resolved.
Show resolved Hide resolved
private final PeriodicMetrics _periodicMetrics;

Expand All @@ -63,6 +64,8 @@ public class DailyPartitionCreator extends AbstractActorWithTimers {
private final Schedule _schedule;
private Optional<Instant> _lastRun;

private Set<LocalDate> _partitionCache;
cwbriones marked this conversation as resolved.
Show resolved Hide resolved

private DailyPartitionCreator(
final EbeanServer ebeanServer,
final PeriodicMetrics periodicMetrics,
Expand All @@ -89,13 +92,14 @@ private DailyPartitionCreator(
_schedule = new PeriodicSchedule.Builder()
.setOffset(scheduleOffset)
.setPeriod(ChronoUnit.DAYS)
.setRunAtAndAfter(Instant.MIN)
.setRunAtAndAfter(Instant.EPOCH)
.setZone(ZoneOffset.UTC)
.build();
_lastRun = Optional.empty();
_schema = schema;
_table = table;
_clock = clock;
_partitionCache = Sets.newHashSet();
}

/**
Expand Down Expand Up @@ -131,49 +135,59 @@ public static Props props(
}

/**
* Ask the actor referenced by {@code ref} to start execution.
* <p>
* This will execute partition creation exactly once and then periodically thereafter using the actor's props.
* Ask the actor referenced by {@code ref} to stop execution.
*
* @param ref an {@code DailyPartitionCreator}.
* @param timeout timeout for the operation
* @throws ExecutionException if an exception was thrown during execution.
* @throws InterruptedException if the actor does not reply within the allotted timeout, or if the actor thread was
* @throws InterruptedException if the actor does not stop within the allotted timeout, or if the actor thread was
* interrupted for other reasons.
*/
public static void start(
final ActorRef ref,
final Duration timeout
) throws ExecutionException, InterruptedException {
Patterns.ask(
ref,
START_TICKING,
timeout
)
.thenCompose(reply -> {
@SuppressWarnings("unchecked") final Optional<Throwable> o = (Optional<Throwable>) reply;
final CompletableFuture<Void> future = new CompletableFuture<>();
o.ifPresent(future::completeExceptionally);
future.complete(null);
return future;
})
.toCompletableFuture()
.get();
public static void stop(final ActorRef ref, final Duration timeout) throws ExecutionException, InterruptedException {
Patterns.gracefulStop(ref, timeout).toCompletableFuture().get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth the method? Especially, since it actually has no logic specific to this actor. Moreover, you could use PatternCS (assuming we're on a new enough version of Akka in MP) which should simplify the call to stop (I believe).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can inline this; I had a method to be consistent with the start call but since that was removed there's no need for the consistency.

As far as the APIs used, PatternsCS and Patterns were merged in Akka 2.5.19 (we're running 2.5.20).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just looked and MAD is running Akka 2.5.16 so it makes sense that using Patterns works here but not over there.

}

/**
* Ask the actor referenced by {@code ref} to stop execution.
* Ask the actor referenced by {@code ref} to create the partition(s) needed
* for the given date.
*
* @param ref an {@code DailyPartitionCreator}.
* @param date The partition date
* @param timeout timeout for the operation
* @throws ExecutionException if an exception was thrown during execution.
* @throws InterruptedException if the actor does not stop within the allotted timeout, or if the actor thread was
* @throws InterruptedException if the actor does not respond within the allotted timeout, or if the actor thread was
* interrupted for other reasons.
*/
public static void stop(final ActorRef ref, final Duration timeout) throws ExecutionException, InterruptedException {
Patterns.gracefulStop(ref, timeout).toCompletableFuture().get();
public static void ensurePartitionExistsForDate(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely without merit, and not as concerned, but at least may be simpler with PatternsCS. I believe we also doubled up on the timeouts with this pattern in MAD and CAGG; e.g.

ArpNetworking/metrics-aggregator-daemon@158884e#diff-fed66f6be12dfd97b6fb62212dd0d6ebR52

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above PatternsCS and Patterns are essentially the same API for this version of Akka - but I'll add the timeouts. I'm pretty sure I'm the one that asked you to double up on that code to begin with 😄

final ActorRef ref,
final LocalDate date,
final Duration timeout
) throws ExecutionException, InterruptedException {
final CreateForRange createPartitions = new CreateForRange.Builder()
.setStart(date)
.setEnd(date.plusDays(1))
.build();
Patterns.ask(
ref,
createPartitions,
timeout
)
.toCompletableFuture()
.get();
}

@Override
public void preStart() {
LOGGER.info().setMessage("Starting execution timer")
.addData("schema", _schema)
.addData("table", _table)
.addData("lookahead", _lookahead)
.log();
getSelf().tell(TICK, getSelf());
getTimers().startPeriodicTimer(TICKER_NAME, TICK, TICK_INTERVAL);
cwbriones marked this conversation as resolved.
Show resolved Hide resolved
}


@Override
public void postStop() throws Exception {
super.postStop();
Expand All @@ -187,9 +201,13 @@ public void postStop() throws Exception {
@Override
public Receive createReceive() {
return new ReceiveBuilder()
.matchEquals(START_TICKING, msg -> startTicking())
.matchEquals(TICK, msg -> tick())
.matchEquals(EXECUTE, msg -> execute())
.match(CreateForRange.class, msg -> {
final Status.Status resp = execute(msg.getStart(), msg.getEnd());
if (!getSender().equals(getSelf())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm. Is this what you were looking for?

https://doc.akka.io/docs/akka/current/typed/interaction-patterns.html

See: "Ignoring replies"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly what I was looking for, but that's Akka 2.6.XX using typed actors so it's not available here 😢

getSender().tell(resp, getSelf());
}
})
.build();
}

Expand All @@ -200,35 +218,42 @@ private void recordCounter(final String metricName, final long value) {

// Message handlers

private void startTicking() {
if (getTimers().isTimerActive(TICKER_NAME)) {
getSender().tell(Optional.of(new IllegalStateException("Timer already started")), getSelf());
return;
}
LOGGER.info().setMessage("Starting execution timer")
.addData("schema", _schema)
.addData("table", _table)
.addData("lookahead", _lookahead)
.log();
final Optional<Exception> executeResult = execute();
getTimers().startPeriodicTimer(TICKER_NAME, TICK, TICK_INTERVAL);
getSender().tell(executeResult, getSelf());
}

private void tick() {
recordCounter("tick", 1);

final Instant now = _clock.instant();
if (_schedule.nextRun(_lastRun).map(run -> run.isBefore(now)).orElse(true)) {
getSelf().tell(EXECUTE, getSelf());
final LocalDate startDate = ZonedDateTime.ofInstant(now, _clock.getZone()).toLocalDate();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a local timezone here? What happens if two boxes have different timezones? Would this work equally well just fixed to UTC?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a local time zone, it is UTC regardless of host. I initialized the clock to Clock.systemUTC(). I can be explicit here and just specify UTC a second time.

final LocalDate endDate = startDate.plusDays(_lookahead);
cwbriones marked this conversation as resolved.
Show resolved Hide resolved

final CreateForRange createPartitions = new CreateForRange.Builder()
.setStart(startDate)
.setEnd(endDate)
.build();
getSelf().tell(createPartitions, getSelf());
}
}

// Wrapper to propagate any errors that occurred to the caller.
// This is really only useful on a call to `start`.
private Optional<Exception> execute() {
final LocalDate startDate = ZonedDateTime.ofInstant(_clock.instant(), _clock.getZone()).toLocalDate();
final LocalDate endDate = startDate.plusDays(_lookahead);
private Status.Status execute(final LocalDate startDate, final LocalDate endDate) {
LocalDate d = startDate;
boolean allPartitionsExist = true;
while (!d.equals(endDate)) {
if (!_partitionCache.contains(d)) {
allPartitionsExist = false;
break;
}
d = d.plusDays(1);
}
if (allPartitionsExist) {
LOGGER.debug()
.setMessage("partitions already exist, ignoring execute request")
.addData("schema", _schema)
.addData("table", _table)
.addData("startDate", startDate)
.addData("endDate", endDate)
.log();
return new Status.Success(null);
}

LOGGER.info()
.setMessage("Creating daily partitions for table")
Expand All @@ -238,12 +263,13 @@ private Optional<Exception> execute() {
.addData("endDate", endDate)
.log();

Optional<Exception> error = Optional.empty();
Status.Status status = new Status.Success(null);
try {
execute(_schema, _table, startDate, endDate);
_lastRun = Optional.of(_clock.instant());
updateCache(startDate, endDate);
} catch (final PersistenceException e) {
error = Optional.of(e);
status = new Status.Failure(e);
LOGGER.error()
.setMessage("Failed to create daily partitions for table")
.addData("schema", _schema)
Expand All @@ -253,9 +279,17 @@ private Optional<Exception> execute() {
.setThrowable(e)
.log();
} finally {
recordCounter("create", error.isPresent() ? 0 : 1);
recordCounter("create", status instanceof Status.Success ? 0 : 1);
}
return status;
}

private void updateCache(final LocalDate start, final LocalDate end) {
LocalDate date = start;
while (!date.equals(end)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As these are local dates, I believe they won't be equal if they have different locales (zones) even if adjusted it's the same point in time. Is this what we want for comparison? (related: see comment about zones above)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the concern, it's not applicable here because LocalDate instances do not have timezone information - it's essentially just day/month/year

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm treating all time in here as UTC which is why the zone offset information was dropped)

_partitionCache.add(date);
date = date.plusDays(1);
}
return error;
}

/**
Expand Down Expand Up @@ -284,4 +318,53 @@ protected void execute(

sql.findOneOrEmpty().orElseThrow(() -> new PersistenceException("Expected a single empty result."));
}

private static final class CreateForRange {
private final LocalDate _start;
private final LocalDate _end;

private CreateForRange(final Builder builder) {
_start = builder._start;
_end = builder._end;
}

public LocalDate getStart() {
return _start;
}

public LocalDate getEnd() {
return _end;
}

static final class Builder extends OvalBuilder<CreateForRange> {
private LocalDate _start;
private LocalDate _end;

Builder() {
super(CreateForRange::new);
}

/**
* Sets the start.
*
* @param start the start.
* @return This instance of {@code Builder} for chaining.
*/
public Builder setStart(final LocalDate start) {
_start = start;
return this;
}

/**
* Sets the end.
*
* @param end the end.
* @return This instance of {@code Builder} for chaining.
*/
public Builder setEnd(final LocalDate end) {
_end = end;
return this;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -124,12 +126,7 @@ private AlertExecution findOrCreateAlertExecution(
public void open() {
assertIsOpen(false);
LOGGER.debug().setMessage("Opening DatabaseAlertExecutionRepository").log();
try {
_partitionCreator = _actorSystem.actorOf(_props);
DailyPartitionCreator.start(_partitionCreator, Duration.ofSeconds(5));
} catch (final ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to start partition creator", e);
}
_partitionCreator = _actorSystem.actorOf(_props);
_isOpen.set(true);
}

Expand Down Expand Up @@ -208,6 +205,7 @@ public Optional<JobExecution<AlertEvaluationResult>> getLastCompleted(final UUID
@Override
public void jobStarted(final UUID alertId, final Organization organization, final Instant scheduled) {
assertIsOpen();
ensurePartition(scheduled);
_helper.jobStarted(alertId, organization, scheduled);
}

Expand All @@ -219,12 +217,14 @@ public void jobSucceeded(
final AlertEvaluationResult result
) {
assertIsOpen();
ensurePartition(scheduled);
_helper.jobSucceeded(alertId, organization, scheduled, result);
}

@Override
public void jobFailed(final UUID alertId, final Organization organization, final Instant scheduled, final Throwable error) {
assertIsOpen();
ensurePartition(scheduled);
_helper.jobFailed(alertId, organization, scheduled, error);
}

Expand All @@ -238,4 +238,21 @@ private void assertIsOpen(final boolean expectedState) {
expectedState ? "open" : "closed"));
}
}

private void ensurePartition(final Instant scheduled) {
if (_partitionCreator == null) {
throw new IllegalStateException("partitionCreator should be non-null when open");
}
try {
DailyPartitionCreator.ensurePartitionExistsForDate(
_partitionCreator,
ZonedDateTime.ofInstant(scheduled, ZoneOffset.UTC).toLocalDate(),
Duration.ofSeconds(1)
);
} catch (final InterruptedException e) {
throw new RuntimeException("partition creation interrupted", e);
} catch (final ExecutionException e) {
throw new RuntimeException("Could not ensure partition for instant: " + scheduled, e);
}
}
}
Loading