Skip to content

Commit

Permalink
Loader integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Jul 10, 2023
1 parent ef87621 commit 6ba622e
Show file tree
Hide file tree
Showing 15 changed files with 597 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.eventgen.enrich.{SdkEvent => EventGenerator}
import com.snowplowanalytics.snowplow.eventgen.protocol.event.{EventFrequencies, UnstructEventFrequencies}
import org.scalacheck.Gen
import org.scalacheck.rng.Seed
/*
* Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils

import java.time.Instant

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random

import org.scalacheck.Gen
import org.scalacheck.rng.Seed

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.eventgen.enrich.{SdkEvent => EventGenerator}
import com.snowplowanalytics.snowplow.eventgen.protocol.event.{EventFrequencies, UnstructEventFrequencies}

final case class InputBatch(content: InputBatch.Content, delay: FiniteDuration = 0.minutes) {
def delayed(value: FiniteDuration) = this.copy(delay = value)
}
Expand Down Expand Up @@ -46,3 +63,4 @@ object InputBatch {
case Content.SdkEvents(events) => events.map(_.toTsv)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils

import scala.concurrent.duration.DurationInt

import cats.implicits._
import cats.effect.IO

import retry._

import fs2.Stream
import fs2.concurrent.{Signal, SignallingRef}

import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.Queue

object ItUtils {

private val timeout = 15.minutes

def produceInput(batches: List[InputBatch], producer: Queue.Producer[IO]): Stream[IO, Unit] =
Stream.eval {
IO.sleep(10.seconds) *> batches.traverse_ { batch =>
IO.sleep(batch.delay) *> InputBatch
.asTextLines(batch.content)
.parTraverse_(producer.send)
}
}

def consumeAllIncomingWindows[A <: GetShreddingComplete](
queueConsumer: Queue.Consumer[IO],
countExpectations: CountExpectations,
windowsAccumulator: SignallingRef[IO, WindowsAccumulator[A]],
getWindowOutput: LoaderMessage.ShreddingComplete => IO[A]
): Stream[IO, Unit] =
queueConsumer.read
.map(_.content)
.map(parseShreddingCompleteMessage)
.evalMap(getWindowOutput(_))
.evalMap { windowOutput =>
windowsAccumulator.update(_.addWindow(windowOutput))
}
.interruptWhen(allEventsProcessed(windowsAccumulator, countExpectations))
.interruptAfter(timeout)

private def allEventsProcessed[A <: GetShreddingComplete](
windowsAccumulator: SignallingRef[IO, WindowsAccumulator[A]],
countExpectations: CountExpectations
): Signal[IO, Boolean] =
windowsAccumulator
.map(_.getTotalNumberOfEvents >= countExpectations.total)

def parseShreddingCompleteMessage(message: String): LoaderMessage.ShreddingComplete =
LoaderMessage
.fromString(message)
.right
.get
.asInstanceOf[LoaderMessage.ShreddingComplete]

def retryUntilNonEmpty[A](io: IO[List[A]]): IO[List[A]] =
retryingOnFailures[List[A]](
policy = RetryPolicies.capDelay[IO](15.minutes, RetryPolicies.constantDelay[IO](30.seconds)),
wasSuccessful = items => IO.delay(items.nonEmpty),
onFailure = (r, d) => IO.delay(println(s"$r - $d"))
)(io)


final case class CountExpectations(good: Int, bad: Int) {
def total = good + bad
}

final case class WindowsAccumulator[A <: GetShreddingComplete](value: List[A]) {
def addWindow(window: A): WindowsAccumulator[A] =
WindowsAccumulator(value :+ window)

def getTotalNumberOfEvents: Long =
value.map { window =>
val good = window.shredding_complete.count.map(_.good).getOrElse(0L)
val bad = window.shredding_complete.count.flatMap(_.bad).getOrElse(0L)
good + bad
}.sum
}

trait GetShreddingComplete {
def shredding_complete: LoaderMessage.ShreddingComplete
}

implicit class ManifestItemListUtils(items: List[LoaderMessage.ManifestItem]) {
def totalGood: Long =
items.foldLeft(0L){ (acc, i) => acc + i.count.map(_.good).getOrElse(0L) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.experimental

import java.util.UUID

import cats.effect.{IO, Resource}

import com.snowplowanalytics.snowplow.rdbloader.azure.{AzureKeyVault, KafkaConsumer, KafkaProducer}
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{Queue, SecretStore}

trait AzureTestResources extends CloudResources {
val eventHubsUrlEnv = "TEST_LOADER_EVENTHUBS_URL"
val inputHubNameEnv = "TEST_LOADER_INPUT_HUB_NAME"
val outputHubNameEnv = "TEST_LOADER_OUTPUT_HUB_NAME"
val inputHubKeyEnv = "TEST_LOADER_INPUT_HUB_KEY"
val outputHubKeyEnv = "TEST_LOADER_OUTPUT_HUB_KEY"
val azureKeyVaultNameEnv = "TEST_LOADER_AZURE_KEY_VAULT_NAME"

def createConsumer: Resource[IO, Queue.Consumer[IO]] =
KafkaConsumer
.consumer[IO](
bootstrapServers = System.getenv(eventHubsUrlEnv),
topicName = System.getenv(outputHubNameEnv),
consumerConf = Map(
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "PLAIN",
"sasl.jaas.config" ->
s"""org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$$ConnectionString\" password=\"${
System.getenv(outputHubKeyEnv)
}\";""",
"group.id" -> s"test-consumer-${UUID.randomUUID()}",
"enable.auto.commit" -> "true",
"auto.offset.reset" -> "latest"
)
)

def createProducer: Resource[IO, Queue.Producer[IO]] =
KafkaProducer
.producer[IO](
bootstrapServers = System.getenv(eventHubsUrlEnv),
topicName = System.getenv(inputHubNameEnv),
producerConf = Map(
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "PLAIN",
"sasl.jaas.config" ->
s"""org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$$ConnectionString\" password=\"${
System.getenv(inputHubKeyEnv)
}\";"""
)
)

def createSecretStore: Resource[IO, SecretStore[IO]] =
AzureKeyVault.create[IO](Some(System.getenv(azureKeyVaultNameEnv)))

override def getCloudResourcesEnvVars: List[String] = List(
eventHubsUrlEnv,
inputHubNameEnv,
outputHubNameEnv,
inputHubKeyEnv,
outputHubKeyEnv,
azureKeyVaultNameEnv
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.experimental

import cats.effect.{IO, Resource}

import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{Queue, SecretStore}

trait CloudResources {

def createConsumer: Resource[IO, Queue.Consumer[IO]]
def createProducer: Resource[IO, Queue.Producer[IO]]
def createSecretStore: Resource[IO, SecretStore[IO]]
def getCloudResourcesEnvVars: List[String]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.experimental

import scala.concurrent.duration.DurationInt

import doobie.ConnectionIO

import cats.effect.{IO, Resource}
import cats.effect.std.Dispatcher
import cats.effect.unsafe.implicits.global

import fs2.concurrent.SignallingRef

import org.http4s.blaze.client.BlazeClientBuilder

import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.Metrics
import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, Monitoring, Transaction}
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{Queue, SecretStore}
import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.ItUtils._
import com.snowplowanalytics.snowplow.rdbloader.common.integrationtestutils.InputBatch

import org.specs2.mutable.Specification

abstract class LoaderSpecification extends Specification with TestDAO.Provider with StorageTargetProvider with AzureTestResources {
skipAllIf(anyEnvironmentVariableMissing())
import LoaderSpecification._

def run[A](
inputBatches: List[InputBatch],
countExpectations: CountExpectations,
dbActions: TestDAO => IO[A]
): IO[(WindowsAccumulator[WindowOutput], A)] =
createResources
.use { resources =>
for {
_ <- resources.testDAO.cleanDb
windowsAccumulator <- SignallingRef.of[IO, WindowsAccumulator[WindowOutput]](WindowsAccumulator(List.empty))
consuming = consumeAllIncomingWindows[WindowOutput](
resources.queueConsumer,
countExpectations,
windowsAccumulator,
getWindowOutput = sc => IO.pure(WindowOutput(sc))
)
producing = produceInput(inputBatches, resources.producer)
_ <- consuming.concurrently(producing).compile.drain
collectedWindows <- windowsAccumulator.get
dbActionResult <- dbActions(resources.testDAO)
} yield (collectedWindows, dbActionResult)
}

def createResources: Resource[IO, TestResources] =
for {
consumer <- createConsumer
producer <- createProducer
implicit0(secretStore: SecretStore[IO]) <- createSecretStore
transaction <- createDbTransaction
testDAO = createDAO(transaction)
} yield TestResources(queueConsumer = consumer, producer = producer, testDAO = testDAO)

def createDbTransaction(implicit secretStore: SecretStore[IO]): Resource[IO, Transaction[IO, ConnectionIO]] = {
val storage: StorageTarget = createStorageTarget
val timeouts: Config.Timeouts = Config.Timeouts(
loading = 15.minutes,
nonLoading = 15.minutes,
sqsVisibility = 15.minutes,
rollbackCommit = 15.minutes,
connectionIsValid = 15.minutes
)
val readyCheck: Config.Retries = Config.Retries(
strategy = Config.Strategy.Constant,
attempts = Some(3),
backoff = 30.seconds,
cumulativeBound = None
)
for {
implicit0(dispatcher: Dispatcher[IO]) <- Dispatcher.parallel[IO]
httpClient <- BlazeClientBuilder[IO].withExecutionContext(global.compute).resource
implicit0(logging: Logging[IO]) = Logging.loggingInterpreter[IO](List())
periodicMetrics <- Resource.eval(Metrics.PeriodicMetrics.init[IO](List.empty, 1.minutes))
implicit0(monitoring: Monitoring[IO]) = Monitoring.monitoringInterpreter[IO](None, None, List.empty, None, httpClient, periodicMetrics)
transaction <- Transaction.interpreter[IO](storage, timeouts, readyCheck)
} yield transaction
}

def anyEnvironmentVariableMissing(): Boolean =
(getCloudResourcesEnvVars ::: getStorageTargetEnvVars).exists(varName => System.getenv(varName) == null)
}

object LoaderSpecification {

final case class TestResources(
queueConsumer: Queue.Consumer[IO],
producer: Queue.Producer[IO],
testDAO: TestDAO
)

final case class WindowOutput(
shredding_complete: LoaderMessage.ShreddingComplete,
) extends GetShreddingComplete
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2012-2023 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
package com.snowplowanalytics.snowplow.rdbloader.experimental

import com.snowplowanalytics.snowplow.rdbloader.config.StorageTarget

trait StorageTargetProvider {
def createStorageTarget: StorageTarget
def getStorageTargetEnvVars: List[String]
}
Loading

0 comments on commit 6ba622e

Please sign in to comment.