Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodically create partitions for alert executions #398

Merged
merged 36 commits into from
Jun 20, 2020
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
abbbe67
WIP
cwbriones May 28, 2020
d2ce1d3
change function
cwbriones May 28, 2020
e2d8ea5
rewrite query so that it's more readable
cwbriones Jun 5, 2020
9275fa9
create and bind actor
cwbriones Jun 5, 2020
f118544
tests pass
cwbriones Jun 8, 2020
b1fc6db
propagate the exception from within the actor
cwbriones Jun 8, 2020
f0ab80a
missing Named annotation
cwbriones Jun 8, 2020
dd711da
Merge branch 'master' into create_partitions
cwbriones Jun 8, 2020
929aa31
rework guice bindings
cwbriones Jun 8, 2020
2b14dff
stop creating on 'open'
cwbriones Jun 8, 2020
2b097fe
moved impl into repository, use ddl user
cwbriones Jun 9, 2020
f015f2d
fix startup issue by specifying a ddl ebean datasource
cwbriones Jun 10, 2020
6f180c5
tests finally pass 😭
cwbriones Jun 10, 2020
fd09528
require migrations in ebean server provider
cwbriones Jun 11, 2020
85ebe05
bump the connection pool, add unit tests
cwbriones Jun 11, 2020
ac21546
suppress fb warning
cwbriones Jun 11, 2020
ed68e90
tweak pool sizes again
cwbriones Jun 11, 2020
e09c1d5
additional checks in tests
cwbriones Jun 11, 2020
e398c9e
revert pool size change
cwbriones Jun 11, 2020
cd9b68f
Revert "revert pool size change"
cwbriones Jun 12, 2020
5c837de
unused import
cwbriones Jun 12, 2020
ed117e2
Merge branch 'master' into create_partitions
cwbriones Jun 12, 2020
07e8921
Merge branch 'master' into create_partitions
cwbriones Jun 16, 2020
9ab2d01
Merge branch 'master' into create_partitions
cwbriones Jun 17, 2020
1919870
Merge branch 'master' into create_partitions
cwbriones Jun 17, 2020
7167b29
address comments
cwbriones Jun 18, 2020
2cb6e0b
use akka.actor.Status instead of mananging the future ourselves
cwbriones Jun 18, 2020
d6a5afb
feedback
cwbriones Jun 18, 2020
5ee82ea
test fixes
cwbriones Jun 18, 2020
395bc09
checkstyle
cwbriones Jun 18, 2020
a7ff3f6
method should be private
cwbriones Jun 18, 2020
4ca92f2
Merge branch 'master' into create_partitions
cwbriones Jun 19, 2020
b18deac
Merge branch 'master' into create_partitions
cwbriones Jun 19, 2020
466bc04
Merge branch 'master' into create_partitions
cwbriones Jun 19, 2020
aa6d0b4
Merge remote-tracking branch 'upstream/master' into create_partitions
cwbriones Jun 20, 2020
96be3c9
feedback part 2
cwbriones Jun 20, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
/*
* Copyright 2020 Dropbox, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.arpnetworking.metrics.portal.alerts.impl;

import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
import com.arpnetworking.metrics.portal.scheduling.Schedule;
import com.arpnetworking.metrics.portal.scheduling.impl.PeriodicSchedule;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import io.ebean.EbeanServer;
import io.ebean.SqlQuery;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.persistence.PersistenceException;

/**
* An actor that will periodically create table partitions.
*
* @author Christian Briones (cbriones at dropbox dot com)
*/
public class DailyPartitionCreator extends AbstractActorWithTimers {
/* package private */ static final Object TICK = new Object();
cwbriones marked this conversation as resolved.
Show resolved Hide resolved
private static final Object START_TICKING = new Object();
private static final Object EXECUTE = new Object();

private static final Logger LOGGER = LoggerFactory.getLogger(DailyPartitionCreator.class);
private static final Duration TICK_INTERVAL = Duration.ofMinutes(1);
private static final String TICKER_NAME = "PERIODIC_TICK";
private final EbeanServer _ebeanServer;
cwbriones marked this conversation as resolved.
Show resolved Hide resolved
private final PeriodicMetrics _periodicMetrics;

private final int _lookahead;
private final String _schema;
private final String _table;
private final Clock _clock;
private final Schedule _schedule;
private Optional<Instant> _lastRun;

private DailyPartitionCreator(
final EbeanServer ebeanServer,
final PeriodicMetrics periodicMetrics,
final String schema,
final String table,
final Duration scheduleOffset,
final int lookahead
) {
this(ebeanServer, periodicMetrics, schema, table, scheduleOffset, lookahead, Clock.systemUTC());
}

/* package private */ DailyPartitionCreator(
final EbeanServer ebeanServer,
final PeriodicMetrics periodicMetrics,
final String schema,
final String table,
final Duration scheduleOffset,
final int lookahead,
final Clock clock
) {
_ebeanServer = ebeanServer;
_periodicMetrics = periodicMetrics;
_lookahead = lookahead;
_schedule = new PeriodicSchedule.Builder()
.setOffset(scheduleOffset)
.setPeriod(ChronoUnit.DAYS)
.setRunAtAndAfter(Instant.MIN)
.setZone(ZoneOffset.UTC)
.build();
_lastRun = Optional.empty();
_schema = schema;
_table = table;
_clock = clock;
}

/**
* Create {@link Props} for this actor.
*
* @param ebeanServer the ebean server
* @param periodicMetrics metrics instance to use
* @param schema The database schema name
* @param table The parent table name
* @param scheduleOffset Execution offset from midnight
* @param lookahead maximum number of partitions to create in advance
* @return A new Props.
*/
public static Props props(
final EbeanServer ebeanServer,
final PeriodicMetrics periodicMetrics,
final String schema,
final String table,
final Duration scheduleOffset,
final int lookahead
) {
return Props.create(
DailyPartitionCreator.class,
() -> new DailyPartitionCreator(
ebeanServer,
periodicMetrics,
schema,
table,
scheduleOffset,
lookahead
)
);
}

/**
* Ask the actor referenced by {@code ref} to start execution.
* <p>
* This will execute partition creation exactly once and then periodically thereafter using the actor's props.
*
* @param ref an {@code DailyPartitionCreator}.
* @param timeout timeout for the operation
* @throws ExecutionException if an exception was thrown during execution.
* @throws InterruptedException if the actor does not reply within the allotted timeout, or if the actor thread was
* interrupted for other reasons.
*/
public static void start(
cwbriones marked this conversation as resolved.
Show resolved Hide resolved
final ActorRef ref,
final Duration timeout
) throws ExecutionException, InterruptedException {
Patterns.ask(
ref,
START_TICKING,
timeout
)
.thenCompose(reply -> {
@SuppressWarnings("unchecked") final Optional<Throwable> o = (Optional<Throwable>) reply;
final CompletableFuture<Void> future = new CompletableFuture<>();
o.ifPresent(future::completeExceptionally);
future.complete(null);
return future;
})
.toCompletableFuture()
.get();
}

/**
* Ask the actor referenced by {@code ref} to stop execution.
*
* @param ref an {@code DailyPartitionCreator}.
* @param timeout timeout for the operation
* @throws ExecutionException if an exception was thrown during execution.
* @throws InterruptedException if the actor does not stop within the allotted timeout, or if the actor thread was
* interrupted for other reasons.
*/
public static void stop(final ActorRef ref, final Duration timeout) throws ExecutionException, InterruptedException {
Patterns.gracefulStop(ref, timeout).toCompletableFuture().get();
}

@Override
public void postStop() throws Exception {
super.postStop();
LOGGER.info().setMessage("Actor was stopped")
.addData("schema", _schema)
.addData("table", _table)
.addData("lookahead", _lookahead)
.log();
}

@Override
public Receive createReceive() {
return new ReceiveBuilder()
.matchEquals(START_TICKING, msg -> startTicking())
.matchEquals(TICK, msg -> tick())
.matchEquals(EXECUTE, msg -> execute())
.build();
}

private void recordCounter(final String metricName, final long value) {
final String fullMetric = String.format("partition_creator/%s/%s", _table, metricName);
_periodicMetrics.recordCounter(fullMetric, value);
}

// Message handlers

private void startTicking() {
cwbriones marked this conversation as resolved.
Show resolved Hide resolved
if (getTimers().isTimerActive(TICKER_NAME)) {
getSender().tell(Optional.of(new IllegalStateException("Timer already started")), getSelf());
return;
}
LOGGER.info().setMessage("Starting execution timer")
.addData("schema", _schema)
.addData("table", _table)
.addData("lookahead", _lookahead)
.log();
final Optional<Exception> executeResult = execute();
getTimers().startPeriodicTimer(TICKER_NAME, TICK, TICK_INTERVAL);
getSender().tell(executeResult, getSelf());
}

private void tick() {
recordCounter("tick", 1);

final Instant now = _clock.instant();
if (_schedule.nextRun(_lastRun).map(run -> run.isBefore(now)).orElse(true)) {
getSelf().tell(EXECUTE, getSelf());
}
}

// Wrapper to propagate any errors that occurred to the caller.
// This is really only useful on a call to `start`.
private Optional<Exception> execute() {
cwbriones marked this conversation as resolved.
Show resolved Hide resolved
final LocalDate startDate = ZonedDateTime.ofInstant(_clock.instant(), _clock.getZone()).toLocalDate();
final LocalDate endDate = startDate.plusDays(_lookahead);

LOGGER.info().setMessage("Creating daily partitions for table")
cwbriones marked this conversation as resolved.
Show resolved Hide resolved
.addData("schema", _schema)
.addData("table", _table)
.addData("startDate", startDate)
.addData("endDate", endDate)
.log();

Optional<Exception> error = Optional.empty();
try {
execute(_schema, _table, startDate, endDate);
_lastRun = Optional.of(_clock.instant());
} catch (final PersistenceException e) {
error = Optional.of(e);
LOGGER.error().setMessage("Failed to create daily partitions for table")
cwbriones marked this conversation as resolved.
Show resolved Hide resolved
.addData("schema", _schema)
.addData("table", _table)
.addData("startDate", startDate)
.addData("endDate", endDate)
.setThrowable(e)
.log();
}
recordCounter("create", error.isPresent() ? 0 : 1);
cwbriones marked this conversation as resolved.
Show resolved Hide resolved
return error;
}

/**
* Create a series of daily partitions for the given parameters.
*
* @param schema the database schema
* @param table the parent table
* @param startDate the start date, inclusive.
* @param endDate the end date, exclusive.
*/
protected void execute(final String schema,
cwbriones marked this conversation as resolved.
Show resolved Hide resolved
final String table,
final LocalDate startDate,
final LocalDate endDate) {
// While this query does not return anything meaningful semantically,
// it still returns a "non-empty" void result and so we can't use the
// ordinarily more appropriate SqlUpdate type.
final SqlQuery sql = _ebeanServer.createSqlQuery(
"select * from create_daily_partition(?::text, ?::text, ?::date, ?::date)")
.setParameter(1, schema)
.setParameter(2, table)
.setParameter(3, startDate)
.setParameter(4, endDate);

sql.findOneOrEmpty().orElseThrow(() -> new IllegalStateException("Expected a single empty result."));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a top level message handler. Throwing the exception here will just result in an uncaught message handler exception in Akka. Seems like we could do better handling, logging and instrumenting this failure. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the top-level handler - the actual handler is the execute overload with no parameters. This has the actual DB logic but the other code adds logging/metrics.

It is the case though that the wrapper code will not handle this illegal state exception which would still cause the problem you're mentioning albeit for different reasons so I could probably handle that.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,31 @@
*/
package com.arpnetworking.metrics.portal.alerts.impl;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
import com.arpnetworking.metrics.portal.alerts.AlertExecutionRepository;
import com.arpnetworking.metrics.portal.scheduling.JobExecutionRepository;
import com.arpnetworking.metrics.portal.scheduling.impl.DatabaseExecutionHelper;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.ebean.EbeanServer;
import models.ebean.AlertExecution;
import models.internal.Organization;
import models.internal.alerts.Alert;
import models.internal.alerts.AlertEvaluationResult;
import models.internal.scheduling.JobExecution;

import java.time.Duration;
import java.time.Instant;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import javax.inject.Named;
import javax.persistence.EntityNotFoundException;

/**
Expand All @@ -49,16 +55,42 @@ public final class DatabaseAlertExecutionRepository implements AlertExecutionRep
private final EbeanServer _ebeanServer;
private final DatabaseExecutionHelper<AlertEvaluationResult, AlertExecution> _helper;

private static final String ACTOR_NAME = "alertExecutionPartitionCreator";
@Nullable
private ActorRef _partitionCreator;
private final Props _props;
private final ActorSystem _actorSystem;

/**
* Public constructor.
*
* @param ebeanServer Play's {@code EbeanServer} for this repository.
* @param portalServer Play's {@code EbeanServer} for this repository.
* @param partitionServer Play's {@code EbeanServer} for partition creation.
* @param actorSystem The actor system to use.
* @param periodicMetrics A metrics instance to record against.
* @param partitionCreationOffset Daily offset for partition creation, e.g. 0 is midnight
* @param partitionCreationLookahead How many days of partitions to create
*/
@Inject
public DatabaseAlertExecutionRepository(@Named("metrics_portal") final EbeanServer ebeanServer) {
_ebeanServer = ebeanServer;
public DatabaseAlertExecutionRepository(
final EbeanServer portalServer,
final EbeanServer partitionServer,
final ActorSystem actorSystem,
final PeriodicMetrics periodicMetrics,
final Duration partitionCreationOffset,
final int partitionCreationLookahead
) {
_ebeanServer = portalServer;
_helper = new DatabaseExecutionHelper<>(LOGGER, _ebeanServer, this::findOrCreateAlertExecution);

_actorSystem = actorSystem;
_props = DailyPartitionCreator.props(
partitionServer,
periodicMetrics,
"portal",
"alert_executions",
partitionCreationOffset,
partitionCreationLookahead
);
}

private AlertExecution findOrCreateAlertExecution(
Expand Down Expand Up @@ -92,13 +124,26 @@ private AlertExecution findOrCreateAlertExecution(
public void open() {
assertIsOpen(false);
LOGGER.debug().setMessage("Opening DatabaseAlertExecutionRepository").log();
try {
_partitionCreator = _actorSystem.actorOf(_props);
DailyPartitionCreator.start(_partitionCreator, Duration.ofSeconds(5));
} catch (final ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to start partition creator", e);
}
_isOpen.set(true);
}

@Override
public void close() {
assertIsOpen();
LOGGER.debug().setMessage("Closing DatabaseAlertExecutionRepository").log();
assert _partitionCreator != null : "partitionCreator should be non-null when open";
cwbriones marked this conversation as resolved.
Show resolved Hide resolved
try {
DailyPartitionCreator.stop(_partitionCreator, Duration.ofSeconds(5));
_partitionCreator = null;
} catch (final ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to shutdown partition creator", e);
}
_isOpen.set(false);
}

Expand Down
Loading