diff --git a/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java b/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java new file mode 100644 index 000000000..f4f4d1dd6 --- /dev/null +++ b/app/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreator.java @@ -0,0 +1,363 @@ +/* + * Copyright 2020 Dropbox, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.arpnetworking.metrics.portal.alerts.impl; + +import akka.actor.AbstractActorWithTimers; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Status; +import akka.japi.pf.ReceiveBuilder; +import akka.pattern.Patterns; +import com.arpnetworking.commons.builder.OvalBuilder; +import com.arpnetworking.metrics.incubator.PeriodicMetrics; +import com.arpnetworking.metrics.portal.scheduling.Schedule; +import com.arpnetworking.metrics.portal.scheduling.impl.PeriodicSchedule; +import com.arpnetworking.steno.Logger; +import com.arpnetworking.steno.LoggerFactory; +import com.google.common.collect.Sets; +import io.ebean.EbeanServer; +import io.ebean.SqlQuery; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import javax.persistence.PersistenceException; + +/** + * An actor that will periodically create table partitions. + * + * @author Christian Briones (cbriones at dropbox dot com) + */ +public class DailyPartitionCreator extends AbstractActorWithTimers { + /* package private */ static final Object TICK = "MSG_TICK"; + private static final Logger LOGGER = LoggerFactory.getLogger(DailyPartitionCreator.class); + private static final Duration TICK_INTERVAL = Duration.ofMinutes(1); + private static final String TICKER_NAME = "PERIODIC_TICK"; + + private final EbeanServer _ebeanServer; + private final PeriodicMetrics _periodicMetrics; + private final Set _partitionCache; + + private final int _lookaheadDays; + private final String _schema; + private final String _table; + private final Clock _clock; + private final Schedule _schedule; + private Optional _lastRun; + + private DailyPartitionCreator( + final EbeanServer ebeanServer, + final PeriodicMetrics periodicMetrics, + final String schema, + final String table, + final Duration scheduleOffset, + final int lookahead + ) { + this(ebeanServer, periodicMetrics, schema, table, scheduleOffset, lookahead, Clock.systemUTC()); + } + + /* package private */ DailyPartitionCreator( + final EbeanServer ebeanServer, + final PeriodicMetrics periodicMetrics, + final String schema, + final String table, + final Duration scheduleOffset, + final int lookaheadDays, + final Clock clock + ) { + _ebeanServer = ebeanServer; + _periodicMetrics = periodicMetrics; + _lookaheadDays = lookaheadDays; + _schedule = new PeriodicSchedule.Builder() + .setOffset(scheduleOffset) + .setPeriod(ChronoUnit.DAYS) + .setRunAtAndAfter(Instant.EPOCH) + .setZone(ZoneOffset.UTC) + .build(); + _lastRun = Optional.empty(); + _schema = schema; + _table = table; + _clock = clock; + _partitionCache = Sets.newHashSet(); + } + + /** + * Create {@link Props} for this actor. + * + * @param ebeanServer the ebean server + * @param periodicMetrics metrics instance to use + * @param schema The database schema name + * @param table The parent table name + * @param scheduleOffset Execution offset from midnight + * @param lookahead maximum number of partitions to create in advance + * @return A new Props. + */ + public static Props props( + final EbeanServer ebeanServer, + final PeriodicMetrics periodicMetrics, + final String schema, + final String table, + final Duration scheduleOffset, + final int lookahead + ) { + return Props.create( + DailyPartitionCreator.class, + () -> new DailyPartitionCreator( + ebeanServer, + periodicMetrics, + schema, + table, + scheduleOffset, + lookahead + ) + ); + } + + /** + * Ask the actor referenced by {@code ref} to create the partition(s) needed + * for the given instant. + * + * @param ref an {@code DailyPartitionCreator}. + * @param instant The instant being recorded + * @param timeout timeout for the operation + * @throws ExecutionException if an exception was thrown during execution. + * @throws InterruptedException if the actor does not respond within the allotted timeout, or if the actor thread was + * interrupted for other reasons. + */ + public static void ensurePartitionExistsForInstant( + final ActorRef ref, + final Instant instant, + final Duration timeout + ) throws ExecutionException, InterruptedException { + final LocalDate date = ZonedDateTime.ofInstant(instant, ZoneOffset.UTC).toLocalDate(); + final CreateForRange createPartitions = new CreateForRange.Builder() + .setStart(date) + .setEnd(date.plusDays(1)) + .build(); + Patterns.ask( + ref, + createPartitions, + timeout + ) + .toCompletableFuture() + .get(); + } + + @Override + public void preStart() { + LOGGER.info().setMessage("Starting execution timer") + .addData("schema", _schema) + .addData("table", _table) + .addData("lookahead", _lookaheadDays) + .log(); + getSelf().tell(TICK, getSelf()); + getTimers().startPeriodicTimer(TICKER_NAME, TICK, TICK_INTERVAL); + } + + + @Override + public void postStop() throws Exception { + super.postStop(); + LOGGER.info().setMessage("Actor was stopped") + .addData("schema", _schema) + .addData("table", _table) + .addData("lookahead", _lookaheadDays) + .log(); + } + + @Override + public Receive createReceive() { + return new ReceiveBuilder() + .matchEquals(TICK, msg -> tick()) + .match(CreateForRange.class, msg -> { + final Status.Status resp = execute(msg.getStart(), msg.getEnd()); + getSender().tell(resp, getSelf()); + }) + .build(); + } + + private void recordCounter(final String metricName, final long value) { + final String fullMetric = String.format("partition_creator/%s/%s", _table, metricName); + _periodicMetrics.recordCounter(fullMetric, value); + } + + // Message handlers + + private void tick() { + recordCounter("tick", 1); + + final Instant now = _clock.instant(); + if (_schedule.nextRun(_lastRun).map(run -> run.isBefore(now)).orElse(true)) { + final LocalDate startDate = ZonedDateTime.ofInstant(now, ZoneOffset.UTC).toLocalDate(); + final LocalDate endDate = startDate.plusDays(_lookaheadDays); + + final CreateForRange createPartitions = new CreateForRange.Builder() + .setStart(startDate) + .setEnd(endDate) + .build(); + getSelf().tell(createPartitions, ActorRef.noSender()); + } + } + + private Status.Status execute(final LocalDate startDate, final LocalDate endDate) { + + // Much like other portions of the codebase dealing with time, the dates + // used in this class are all fixed to UTC. So while the code in this + // method uses a LocalDate, there's an implicit assumption that all + // dates are UTC and these conversions happen at the interaction + // boundary (tick, ensurePartitionExists). + + LocalDate d = startDate; + boolean allPartitionsExist = true; + while (!d.equals(endDate)) { + if (!_partitionCache.contains(d)) { + allPartitionsExist = false; + break; + } + d = d.plusDays(1); + } + if (allPartitionsExist) { + LOGGER.debug() + .setMessage("partitions already exist, ignoring execute request") + .addData("schema", _schema) + .addData("table", _table) + .addData("startDate", startDate) + .addData("endDate", endDate) + .log(); + return new Status.Success(null); + } + + LOGGER.info() + .setMessage("Creating daily partitions for table") + .addData("schema", _schema) + .addData("table", _table) + .addData("startDate", startDate) + .addData("endDate", endDate) + .log(); + + Status.Status status = new Status.Success(null); + try { + execute(_schema, _table, startDate, endDate); + _lastRun = Optional.of(_clock.instant()); + updateCache(startDate, endDate); + } catch (final PersistenceException e) { + status = new Status.Failure(e); + LOGGER.error() + .setMessage("Failed to create daily partitions for table") + .addData("schema", _schema) + .addData("table", _table) + .addData("startDate", startDate) + .addData("endDate", endDate) + .setThrowable(e) + .log(); + } finally { + recordCounter("create", status instanceof Status.Success ? 0 : 1); + } + return status; + } + + private void updateCache(final LocalDate start, final LocalDate end) { + LocalDate date = start; + while (!date.equals(end)) { + _partitionCache.add(date); + date = date.plusDays(1); + } + } + + /** + * Create a series of daily partitions for the given parameters. + * + * @param schema the database schema + * @param table the parent table + * @param startDate the start date, inclusive. + * @param endDate the end date, exclusive. + */ + protected void execute( + final String schema, + final String table, + final LocalDate startDate, + final LocalDate endDate + ) { + // While this query does not return anything meaningful semantically, + // it still returns a "non-empty" void result and so we can't use the + // ordinarily more appropriate SqlUpdate type. + final SqlQuery sql = _ebeanServer.createSqlQuery( + "select * from create_daily_partition(?::text, ?::text, ?::date, ?::date)") + .setParameter(1, schema) + .setParameter(2, table) + .setParameter(3, startDate) + .setParameter(4, endDate); + + sql.findOneOrEmpty().orElseThrow(() -> new PersistenceException("Expected a single empty result.")); + } + + private static final class CreateForRange { + private final LocalDate _start; + private final LocalDate _end; + + private CreateForRange(final Builder builder) { + _start = builder._start; + _end = builder._end; + } + + public LocalDate getStart() { + return _start; + } + + public LocalDate getEnd() { + return _end; + } + + static final class Builder extends OvalBuilder { + private LocalDate _start; + private LocalDate _end; + + Builder() { + super(CreateForRange::new); + } + + /** + * Sets the start. + * + * @param start the start. + * @return This instance of {@code Builder} for chaining. + */ + public Builder setStart(final LocalDate start) { + _start = start; + return this; + } + + /** + * Sets the end. + * + * @param end the end. + * @return This instance of {@code Builder} for chaining. + */ + public Builder setEnd(final LocalDate end) { + _end = end; + return this; + } + } + } +} diff --git a/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java b/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java index 911dd2d6b..1547654b3 100644 --- a/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java +++ b/app/com/arpnetworking/metrics/portal/alerts/impl/DatabaseAlertExecutionRepository.java @@ -15,11 +15,17 @@ */ package com.arpnetworking.metrics.portal.alerts.impl; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.pattern.Patterns; +import com.arpnetworking.metrics.incubator.PeriodicMetrics; import com.arpnetworking.metrics.portal.alerts.AlertExecutionRepository; import com.arpnetworking.metrics.portal.scheduling.JobExecutionRepository; import com.arpnetworking.metrics.portal.scheduling.impl.DatabaseExecutionHelper; import com.arpnetworking.steno.Logger; import com.arpnetworking.steno.LoggerFactory; +import edu.umd.cs.findbugs.annotations.Nullable; import io.ebean.EbeanServer; import models.ebean.AlertExecution; import models.internal.Organization; @@ -27,13 +33,16 @@ import models.internal.alerts.AlertEvaluationResult; import models.internal.scheduling.JobExecution; +import java.time.Duration; import java.time.Instant; import java.util.NoSuchElementException; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import javax.inject.Inject; -import javax.inject.Named; import javax.persistence.EntityNotFoundException; /** @@ -44,21 +53,48 @@ public final class DatabaseAlertExecutionRepository implements AlertExecutionRepository { private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseAlertExecutionRepository.class); + private static final Duration ACTOR_STOP_TIMEOUT = Duration.ofSeconds(5); private final AtomicBoolean _isOpen = new AtomicBoolean(false); private final EbeanServer _ebeanServer; private final DatabaseExecutionHelper _helper; + private static final String ACTOR_NAME = "alertExecutionPartitionCreator"; + @Nullable + private ActorRef _partitionCreator; + private final Props _props; + private final ActorSystem _actorSystem; + /** * Public constructor. * - * @param ebeanServer Play's {@code EbeanServer} for this repository. + * @param portalServer Play's {@code EbeanServer} for this repository. + * @param partitionServer Play's {@code EbeanServer} for partition creation. + * @param actorSystem The actor system to use. + * @param periodicMetrics A metrics instance to record against. + * @param partitionCreationOffset Daily offset for partition creation, e.g. 0 is midnight + * @param partitionCreationLookahead How many days of partitions to create */ @Inject - public DatabaseAlertExecutionRepository(@Named("metrics_portal") final EbeanServer ebeanServer) { - _ebeanServer = ebeanServer; + public DatabaseAlertExecutionRepository( + final EbeanServer portalServer, + final EbeanServer partitionServer, + final ActorSystem actorSystem, + final PeriodicMetrics periodicMetrics, + final Duration partitionCreationOffset, + final int partitionCreationLookahead + ) { + _ebeanServer = portalServer; _helper = new DatabaseExecutionHelper<>(LOGGER, _ebeanServer, this::findOrCreateAlertExecution); - + _actorSystem = actorSystem; + _props = DailyPartitionCreator.props( + partitionServer, + periodicMetrics, + "portal", + "alert_executions", + partitionCreationOffset, + partitionCreationLookahead + ); } private AlertExecution findOrCreateAlertExecution( @@ -92,6 +128,7 @@ private AlertExecution findOrCreateAlertExecution( public void open() { assertIsOpen(false); LOGGER.debug().setMessage("Opening DatabaseAlertExecutionRepository").log(); + _partitionCreator = _actorSystem.actorOf(_props); _isOpen.set(true); } @@ -99,6 +136,17 @@ public void open() { public void close() { assertIsOpen(); LOGGER.debug().setMessage("Closing DatabaseAlertExecutionRepository").log(); + if (_partitionCreator == null) { + throw new IllegalStateException("partitionCreator should be non-null when open"); + } + try { + Patterns.gracefulStop(_partitionCreator, ACTOR_STOP_TIMEOUT) + .toCompletableFuture() + .get(ACTOR_STOP_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + _partitionCreator = null; + } catch (final TimeoutException | ExecutionException | InterruptedException e) { + throw new RuntimeException("Failed to shutdown partition creator", e); + } _isOpen.set(false); } @@ -161,6 +209,7 @@ public Optional> getLastCompleted(final UUID @Override public void jobStarted(final UUID alertId, final Organization organization, final Instant scheduled) { assertIsOpen(); + ensurePartition(scheduled); _helper.jobStarted(alertId, organization, scheduled); } @@ -172,12 +221,14 @@ public void jobSucceeded( final AlertEvaluationResult result ) { assertIsOpen(); + ensurePartition(scheduled); _helper.jobSucceeded(alertId, organization, scheduled, result); } @Override public void jobFailed(final UUID alertId, final Organization organization, final Instant scheduled, final Throwable error) { assertIsOpen(); + ensurePartition(scheduled); _helper.jobFailed(alertId, organization, scheduled, error); } @@ -191,4 +242,21 @@ private void assertIsOpen(final boolean expectedState) { expectedState ? "open" : "closed")); } } + + private void ensurePartition(final Instant scheduled) { + if (_partitionCreator == null) { + throw new IllegalStateException("partitionCreator should be non-null when open"); + } + try { + DailyPartitionCreator.ensurePartitionExistsForInstant( + _partitionCreator, + scheduled, + Duration.ofSeconds(1) + ); + } catch (final InterruptedException e) { + throw new RuntimeException("partition creation interrupted", e); + } catch (final ExecutionException e) { + throw new RuntimeException("Could not ensure partition for instant: " + scheduled, e); + } + } } diff --git a/app/global/MainModule.java b/app/global/MainModule.java index 54d3653e5..c87d882c0 100644 --- a/app/global/MainModule.java +++ b/app/global/MainModule.java @@ -51,6 +51,7 @@ import com.arpnetworking.metrics.incubator.impl.TsdPeriodicMetrics; import com.arpnetworking.metrics.portal.alerts.AlertExecutionRepository; import com.arpnetworking.metrics.portal.alerts.AlertRepository; +import com.arpnetworking.metrics.portal.alerts.impl.DatabaseAlertExecutionRepository; import com.arpnetworking.metrics.portal.alerts.impl.FileAlertRepository; import com.arpnetworking.metrics.portal.alerts.scheduling.AlertExecutionContext; import com.arpnetworking.metrics.portal.alerts.scheduling.AlertJobRepository; @@ -101,6 +102,7 @@ import models.internal.Features; import models.internal.MetricsQueryFormat; import models.internal.impl.DefaultFeatures; +import org.flywaydb.play.PlayInitializer; import play.Environment; import play.api.Configuration; import play.api.db.evolutions.DynamicEvolutions; @@ -109,6 +111,7 @@ import play.db.ebean.EbeanConfig; import play.inject.ApplicationLifecycle; import play.libs.Json; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import java.net.URI; @@ -147,6 +150,10 @@ protected void configure() { .annotatedWith(Names.named("metrics_portal")) .toProvider(MetricsPortalEbeanServerProvider.class); + bind(EbeanServer.class) + .annotatedWith(Names.named("metrics_portal_ddl")) + .toProvider(MetricsPortalDDLEbeanServerProvider.class); + // Ebean initializes the ServerConfig from outside of Play/Guice so we can't hook in any dependencies without // statically injecting them. Construction still happens at inject time, however. requestStaticInjection(MetricsPortalServerConfigStartup.class); @@ -433,13 +440,38 @@ private FileAlertRepository provideFileSystemAlertRepository( ); } + @Provides + @SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice + private DatabaseAlertExecutionRepository provideDatabaseAlertExecutionRepository( + final Config config, + final PeriodicMetrics periodicMetrics, + final ActorSystem actorSystem, + @Named("metrics_portal") final EbeanServer portalServer, + @Named("metrics_portal_ddl") final EbeanServer ddlServer + ) { + final Config partitionConfig = config.getObject("alertExecutionRepository.partitionManager").toConfig(); + + final int maxLookAhead = partitionConfig.getInt("lookahead"); + final Duration offset = ConfigurationHelper.getFiniteDuration(partitionConfig, "offset"); + return new DatabaseAlertExecutionRepository( + portalServer, + ddlServer, + actorSystem, + periodicMetrics, + java.time.Duration.ofSeconds(offset.toSeconds()), + maxLookAhead + ); + } + private static final class MetricsPortalEbeanServerProvider implements Provider { @Inject MetricsPortalEbeanServerProvider( final Configuration configuration, final DynamicEvolutions dynamicEvolutions, - final EbeanConfig ebeanConfig) { + final EbeanConfig ebeanConfig, + final PlayInitializer flywayInitializer) { // Constructor arguments injected for dependency resolution only + // e.g. requiring migrations to run } @Override @@ -448,6 +480,23 @@ public EbeanServer get() { } } + private static final class MetricsPortalDDLEbeanServerProvider implements Provider { + @Inject + MetricsPortalDDLEbeanServerProvider( + final Configuration configuration, + final DynamicEvolutions dynamicEvolutions, + final EbeanConfig ebeanConfig, + final PlayInitializer flywayInitializer) { + // Constructor arguments injected for dependency resolution only + // e.g. requiring migrations to run + } + + @Override + public EbeanServer get() { + return Ebean.getServer("metrics_portal_ddl"); + } + } + private static final class OrganizationRepositoryProvider implements Provider { @Inject OrganizationRepositoryProvider( @@ -552,7 +601,8 @@ private static final class AlertExecutionRepositoryProvider implements Provider< final Injector injector, final Environment environment, final Config configuration, - final ApplicationLifecycle lifecycle) { + final ApplicationLifecycle lifecycle + ) { _injector = injector; _environment = environment; _configuration = configuration; diff --git a/conf/db/migration/metrics_portal_ddl/V22__replace_alert_execution_function.sql b/conf/db/migration/metrics_portal_ddl/V22__replace_alert_execution_function.sql new file mode 100644 index 000000000..5af5e351e --- /dev/null +++ b/conf/db/migration/metrics_portal_ddl/V22__replace_alert_execution_function.sql @@ -0,0 +1,49 @@ +-- Copyright 2020 Dropbox, Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- Create daily partition tables _YEAR_MONTH_DAY for a specified parent table. +-- +-- Example for portal.alert_executions, 8 May 2020: +-- +-- CREATE TABLE portal.alert_executions_2020_05_08 PARTITION OF portal.alert_executions +-- FOR VALUES FROM ('2020-05-08') TO ('2020-05-09'); +-- +-- Params: +-- Table - Text - The name of the parent table. +-- Start - Date - The beginning date of the time range, inclusive. +-- End - Date - The end date of the time range, exclusive. +CREATE OR REPLACE FUNCTION create_daily_partition(schema TEXT, tablename TEXT, start_date DATE, end_date DATE) +returns void AS $$ +DECLARE + partition_table text; + day DATE; +BEGIN + FOR day IN (SELECT d FROM generate_series(start_date, end_date, '1 day') AS d) + LOOP + partition_table := tablename || '_' || TO_CHAR(day, 'YYYY_MM_DD'); + EXECUTE format( + $query$ + CREATE TABLE IF NOT EXISTS %I.%I PARTITION OF %I.%I FOR VALUES FROM (%L) TO (%L); + $query$, + schema, + partition_table, + schema, + tablename, + day, + day + 1 + ); + END LOOP; +END; +$$ +language plpgsql; diff --git a/conf/postgresql.application.conf b/conf/postgresql.application.conf index 707eeaa3c..6f033997b 100644 --- a/conf/postgresql.application.conf +++ b/conf/postgresql.application.conf @@ -54,7 +54,7 @@ db { migration.schemas = ["portal"] hikaricp.poolName = "metrics_portal_ddl" - hikaricp.maximumPoolSize = 2 + hikaricp.maximumPoolSize = 5 } akka_ddl { @@ -85,7 +85,10 @@ play.evolutions.enabled = false play.modules.enabled += "org.flywaydb.play.PlayModule" ebeanconfig.datasource.default = "metrics_portal" play.ebean.defaultDatasource = "metrics_portal" + ebean.metrics_portal = ["models.ebean.*", "global.MetricsPortalServerConfigStartup"] +# Only used for table creation, so no models are needed. +ebean.metrics_portal_ddl = [] # Host repository # ~~~~~ @@ -94,6 +97,10 @@ hostRepository.type = "com.arpnetworking.metrics.portal.hosts.impl.DatabaseHostR # Alerts # ~~~~~ alertExecutionRepository.type = "com.arpnetworking.metrics.portal.alerts.impl.DatabaseAlertExecutionRepository" +alertExecutionRepository.partitionManager { + lookahead = 7 + offset = "0s" +} # Reports # ~~~~~ diff --git a/extra-checkstyle-suppressions.xml b/extra-checkstyle-suppressions.xml index 4c60f704a..cc520c307 100644 --- a/extra-checkstyle-suppressions.xml +++ b/extra-checkstyle-suppressions.xml @@ -25,4 +25,7 @@ + + + diff --git a/main/postgres/initdb.d/init.sql.1 b/main/postgres/initdb.d/init.sql.1 index 3951ae522..9a24c3864 100644 --- a/main/postgres/initdb.d/init.sql.1 +++ b/main/postgres/initdb.d/init.sql.1 @@ -27,7 +27,7 @@ ALTER ROLE metrics_app WITH NOSUPERUSER INHERIT NOCREATEDB NOCREATEROLE NOREPLIC CREATE ROLE metrics_dba LOGIN; ALTER ROLE metrics_dba WITH PASSWORD 'metrics_dba_password'; -ALTER ROLE metrics_dba WITH NOSUPERUSER INHERIT NOCREATEDB NOCREATEROLE NOREPLICATION CONNECTION LIMIT 6; +ALTER ROLE metrics_dba WITH NOSUPERUSER INHERIT NOCREATEDB NOCREATEROLE NOREPLICATION CONNECTION LIMIT 15; CREATE ROLE akka_app LOGIN; ALTER ROLE akka_app WITH PASSWORD 'akka_app_password'; diff --git a/pom.xml b/pom.xml index fd7c8bc8d..747b66e09 100644 --- a/pom.xml +++ b/pom.xml @@ -1187,7 +1187,6 @@ org.flywaydb flyway-play_${scala.package.version} ${flyway.play.version} - runtime org.postgresql diff --git a/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java b/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java new file mode 100644 index 000000000..c17436a25 --- /dev/null +++ b/test/java/com/arpnetworking/metrics/portal/alerts/impl/DailyPartitionCreatorTest.java @@ -0,0 +1,238 @@ +/* + * Copyright 2020 Dropbox, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.arpnetworking.metrics.portal.alerts.impl; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.pattern.Patterns; +import akka.testkit.javadsl.TestKit; +import com.arpnetworking.commons.java.time.ManualClock; +import com.arpnetworking.metrics.incubator.PeriodicMetrics; +import com.google.common.base.MoreObjects; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.ebean.EbeanServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.ExecutionException; +import javax.persistence.PersistenceException; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link DailyPartitionCreator}. + * + * These don't actually exercise the DB code, but that case should be covered + * instead by the integration tests that use this class. + * + * @author Christian Briones (cbriones at dropbox dot com) + */ +public class DailyPartitionCreatorTest { + private static final String TEST_SCHEMA = "TEST_SCHEMA"; + private static final String TEST_TABLE = "TEST_TABLE"; + private static final Duration MSG_TIMEOUT = Duration.ofSeconds(1); + private static final long TEST_LOOKAHEAD = 7; + + private ManualClock _clock; + + // Unused Mocks + private EbeanServer _server; + private PeriodicMetrics _metrics; + + // ActorSystem fields + private ActorSystem _actorSystem; + private TestKit _probe; + + @Before + public void setUp() { + _server = Mockito.mock(EbeanServer.class); + _metrics = Mockito.mock(PeriodicMetrics.class); + _clock = new ManualClock(Instant.now(), Duration.ofDays(1), ZoneOffset.UTC); + + _actorSystem = ActorSystem.create(); + _probe = new TestKit(_actorSystem); + } + + private ActorRef createActor() { + return createActor(() -> { }); + } + + @SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON") + private ActorRef createActor(final Runnable onExecute) { + // Create an actor with the db execution behavior mocked out. + final Props props = Props.create( + DailyPartitionCreator.class, + () -> new DailyPartitionCreator( + _server, + _metrics, + TEST_SCHEMA, + TEST_TABLE, + Duration.ZERO, + (int) TEST_LOOKAHEAD, + _clock + ) { + @Override + protected void execute( + final String schema, + final String table, + final LocalDate startDate, + final LocalDate endDate + ) { + onExecute.run(); + _probe.getRef().tell( + new ExecuteCall(schema, table, startDate, endDate), + _probe.getRef() + ); + } + } + ); + final ActorRef ref = _actorSystem.actorOf(props); + _probe.watch(ref); + return ref; + } + + @After + public void tearDown() { + TestKit.shutdownActorSystem(_actorSystem); + } + + @Test + public void testCreatePartitionsOnTick() throws Exception { + final ActorRef ref = createActor(); + + // The actor will tick on startup. + ExecuteCall call = _probe.expectMsgClass(ExecuteCall.class); + long clockDifference = ChronoUnit.DAYS.between( + call.getStart(), + ZonedDateTime.ofInstant(_clock.instant(), _clock.getZone()) + ); + assertThat("range should start from current date", clockDifference, equalTo(0L)); + assertThat(call.getSchema(), equalTo(TEST_SCHEMA)); + assertThat(call.getTable(), equalTo(TEST_TABLE)); + + for (int i = 0; i < 3; i++) { + // Clock didn't move + ref.tell(DailyPartitionCreator.TICK, _probe.getRef()); + _probe.expectNoMessage(MSG_TIMEOUT); + + // Clock moved 1 day + _clock.tick(); + ref.tell(DailyPartitionCreator.TICK, _probe.getRef()); + call = _probe.expectMsgClass(ExecuteCall.class); + + assertThat(call.getSchema(), equalTo(TEST_SCHEMA)); + assertThat(call.getTable(), equalTo(TEST_TABLE)); + + final long difference = ChronoUnit.DAYS.between(call.getStart(), call.getEnd()); + assertThat("range should respect lookahead", difference, equalTo(TEST_LOOKAHEAD)); + + clockDifference = ChronoUnit.DAYS.between( + call.getStart(), + ZonedDateTime.ofInstant(_clock.instant(), _clock.getZone()) + ); + assertThat("range should start from current date", clockDifference, equalTo(0L)); + } + Patterns.gracefulStop(ref, MSG_TIMEOUT).toCompletableFuture().get(); + _probe.expectTerminated(ref); + } + + @Test + public void testCreatePartitionsOnDemand() throws Exception { + final ZonedDateTime oneWeekAgo = ZonedDateTime.now().minusDays(7); + final LocalDate oneWeekAgoLocal = oneWeekAgo.toLocalDate(); + final ActorRef ref = createActor(); + + // The actor will tick on startup. + _probe.expectMsgClass(ExecuteCall.class); + + DailyPartitionCreator.ensurePartitionExistsForInstant(ref, oneWeekAgo.toInstant(), MSG_TIMEOUT); + final ExecuteCall call = _probe.expectMsgClass(ExecuteCall.class); + assertThat(call.getStart(), equalTo(oneWeekAgoLocal)); + assertThat(call.getEnd(), equalTo(oneWeekAgoLocal.plusDays(1))); + assertThat(call.getSchema(), equalTo(TEST_SCHEMA)); + assertThat(call.getTable(), equalTo(TEST_TABLE)); + + DailyPartitionCreator.ensurePartitionExistsForInstant(ref, oneWeekAgo.toInstant(), MSG_TIMEOUT); + _probe.expectNoMessage(); // should have been cached + + Patterns.gracefulStop(ref, MSG_TIMEOUT).toCompletableFuture().get(); + _probe.expectTerminated(ref); + } + + @Test(expected = ExecutionException.class) + public void testExecutionError() throws Exception { + final ActorRef ref = createActor( + () -> { + throw new PersistenceException("Something went wrong"); + } + ); + DailyPartitionCreator.ensurePartitionExistsForInstant(ref, Instant.now(), MSG_TIMEOUT); + } + + /** + * Helper message class used by the test probe in this suite to inspect arguments to DailyPartionCreator#execute. + */ + public static final class ExecuteCall { + private final String _schema; + private final String _table; + private final LocalDate _start; + private final LocalDate _end; + + public ExecuteCall(final String schema, final String table, final LocalDate start, final LocalDate end) { + _schema = schema; + _table = table; + _start = start; + _end = end; + } + + public String getSchema() { + return _schema; + } + + public String getTable() { + return _table; + } + + public LocalDate getStart() { + return _start; + } + + public LocalDate getEnd() { + return _end; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("_schema", _schema) + .add("_table", _table) + .add("_start", _start) + .add("_end", _end) + .toString(); + } + } +} diff --git a/test/java/com/arpnetworking/metrics/portal/integration/repositories/DatabaseAlertExecutionRepositoryIT.java b/test/java/com/arpnetworking/metrics/portal/integration/repositories/DatabaseAlertExecutionRepositoryIT.java index cd90ffd88..3761c0f20 100644 --- a/test/java/com/arpnetworking/metrics/portal/integration/repositories/DatabaseAlertExecutionRepositoryIT.java +++ b/test/java/com/arpnetworking/metrics/portal/integration/repositories/DatabaseAlertExecutionRepositoryIT.java @@ -16,6 +16,9 @@ package com.arpnetworking.metrics.portal.integration.repositories; +import akka.actor.ActorSystem; +import akka.testkit.javadsl.TestKit; +import com.arpnetworking.metrics.incubator.PeriodicMetrics; import com.arpnetworking.metrics.portal.TestBeanFactory; import com.arpnetworking.metrics.portal.alerts.impl.DatabaseAlertExecutionRepository; import com.arpnetworking.metrics.portal.integration.test.EbeanServerHelper; @@ -26,7 +29,9 @@ import models.internal.Organization; import models.internal.alerts.AlertEvaluationResult; import models.internal.impl.DefaultAlertEvaluationResult; +import org.mockito.Mockito; +import java.time.Duration; import java.util.UUID; /** @@ -35,18 +40,37 @@ * @author Christian Briones (cbriones at dropbox dot com) */ public class DatabaseAlertExecutionRepositoryIT extends JobExecutionRepositoryIT { + private ActorSystem _actorSystem; + @Override public JobExecutionRepository setUpRepository(final Organization organization, final UUID jobId) { final EbeanServer server = EbeanServerHelper.getMetricsDatabase(); + final EbeanServer adminServer = EbeanServerHelper.getAdminMetricsDatabase(); + + // DatabaseAlertExecutionRepository does not validate that the JobID is a valid AlertID since those + // references are not constrained in the underlying execution table. final models.ebean.Organization ebeanOrganization = TestBeanFactory.createEbeanOrganization(); ebeanOrganization.setUuid(organization.getId()); server.save(ebeanOrganization); - // DatabaseAlertExecutionRepository does not validate that the JobID is a valid AlertID since those - // references are not constrained in the underlying execution table. + _actorSystem = ActorSystem.create(); + final PeriodicMetrics metricsMock = Mockito.mock(PeriodicMetrics.class); + + return new DatabaseAlertExecutionRepository( + server, + adminServer, + _actorSystem, + metricsMock, + Duration.ZERO, + 5 // Arbitrary, but helps distinguish logs + ); + } - return new DatabaseAlertExecutionRepository(server); + @Override + public void tearDown() { + super.tearDown(); + TestKit.shutdownActorSystem(_actorSystem); } @Override diff --git a/test/java/com/arpnetworking/metrics/portal/integration/test/EbeanServerHelper.java b/test/java/com/arpnetworking/metrics/portal/integration/test/EbeanServerHelper.java index a4c762613..fe8c4d53f 100644 --- a/test/java/com/arpnetworking/metrics/portal/integration/test/EbeanServerHelper.java +++ b/test/java/com/arpnetworking/metrics/portal/integration/test/EbeanServerHelper.java @@ -53,6 +53,7 @@ public static synchronized EbeanServer getMetricsDatabase() { METRICS_DATABASE_NAME, METRICS_DATABASE_USERNAME, METRICS_DATABASE_PASSWORD, + DEFAULT_POOL_SIZE, true); migrateServer( getEnvOrDefault("PG_HOST", "localhost"), @@ -66,19 +67,41 @@ public static synchronized EbeanServer getMetricsDatabase() { return ebeanServer; } + /** + * Obtain a reference to the shared Metrics database {@code EbeanServer} with the admin user. + * + * @return reference to the shared Metrics database {@code EbeanServer} + */ + public static synchronized EbeanServer getAdminMetricsDatabase() { + @Nullable EbeanServer ebeanServer = EBEAN_SERVER_MAP.get(METRICS_ADMIN_NAME); + if (ebeanServer == null) { + ebeanServer = createEbeanServer( + getEnvOrDefault("PG_HOST", "localhost"), + getEnvOrDefault("PG_PORT", DEFAULT_POSTGRES_PORT, Integer::parseInt), + METRICS_DATABASE_NAME, + METRICS_DATABASE_ADMIN_USERNAME, + METRICS_DATABASE_ADMIN_PASSWORD, + ADMIN_POOL_SIZE, + false); + EBEAN_SERVER_MAP.put(METRICS_ADMIN_NAME, ebeanServer); + } + return ebeanServer; + } + private static EbeanServer createEbeanServer( final String hostname, final int port, final String database, final String username, final String password, + final int poolSize, final boolean setAsDefault) { final String name = database + "-" + UUID.randomUUID().toString(); final HikariConfig hikariConfig = new HikariConfig(); hikariConfig.setDriverClassName("org.postgresql.Driver"); hikariConfig.setJdbcUrl(createJdbcUrl(hostname, port, database)); - hikariConfig.setMaximumPoolSize(DEFAULT_POOL_SIZE); + hikariConfig.setMaximumPoolSize(poolSize); hikariConfig.setPassword(password); hikariConfig.setPoolName(name); hikariConfig.setUsername(username); @@ -89,6 +112,8 @@ private static EbeanServer createEbeanServer( serverConfig.setDataSource(new HikariDataSource(hikariConfig)); serverConfig.addPackage("models.ebean"); serverConfig.setObjectMapper(SerializationTestUtils.getApiObjectMapper()); + serverConfig.setDataTimeZone("UTC"); + return EbeanServerFactory.create(serverConfig); } @@ -128,10 +153,12 @@ private EbeanServerHelper() {} private static final Map EBEAN_SERVER_MAP = Maps.newHashMap(); private static final String METRICS_DATABASE_NAME = "metrics"; + private static final String METRICS_ADMIN_NAME = "metrics_ddl"; private static final String METRICS_DATABASE_USERNAME = "metrics_app"; private static final String METRICS_DATABASE_PASSWORD = "metrics_app_password"; private static final String METRICS_DATABASE_ADMIN_USERNAME = "metrics_dba"; private static final String METRICS_DATABASE_ADMIN_PASSWORD = "metrics_dba_password"; private static final int DEFAULT_POSTGRES_PORT = 6432; private static final int DEFAULT_POOL_SIZE = 50; + private static final int ADMIN_POOL_SIZE = 5; }