diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/TransformerSpecification.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/TransformerSpecification.scala index b7be1783d..5810c7029 100644 --- a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/TransformerSpecification.scala +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/TransformerSpecification.scala @@ -189,13 +189,13 @@ abstract class TransformerSpecification extends Specification with AppDependenci .through(decompressIfNeeded(blob.key)) .compile .to(Array) - .flatMap(convertBlobToListOfRows) + .flatMap(convertBlobToListOfRows(blob.key)) - private def convertBlobToListOfRows(blob: Blob): IO[List[DataRow]] = - requiredAppConfig.fileFormat match { - case WideRowFormat.JSON => OutputDataRowReader.fromJson(blob) - case WideRowFormat.PARQUET => OutputDataRowReader.fromParquet(blob) - } + private def convertBlobToListOfRows(blobKey: BlobStorage.Key)(blob: Blob): IO[List[DataRow]] = + if (isBlobGoodParquetData(blobKey)) + OutputDataRowReader.fromParquet(blob) + else + OutputDataRowReader.fromJson(blob) // Decompress only for: // - JSON good/bad output diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/generic.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/generic.scala deleted file mode 100644 index aef803851..000000000 --- a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/generic.scala +++ /dev/null @@ -1,91 +0,0 @@ -package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.scenarios - -import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat -import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.{bad, good} -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification.CountExpectations -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{AppConfiguration, AzureTransformerSpecification} - -import scala.concurrent.duration.DurationDouble - -// JSON + GZIP -class GenericScenario1 extends AzureTransformerSpecification { - def description = "Input: 1 good, config: JSON, compression, windowing: 1 minute" - def requiredAppConfig = AppConfiguration.default - def inputBatches = List(good(count = 1)) - def countExpectations = CountExpectations(good = 1, bad = 0) -} - -class GenericScenario2 extends AzureTransformerSpecification { - def description = "Input: 1 bad, config: JSON, compression, windowing: 1 minute" - def requiredAppConfig = AppConfiguration.default - def inputBatches = List(bad(count = 1)) - def countExpectations = CountExpectations(good = 0, bad = 1) -} - -class GenericScenario3 extends AzureTransformerSpecification { - def description = "Input: 10000 good, config: JSON, compression, windowing: 1 minute" - def requiredAppConfig = AppConfiguration.default - def inputBatches = List(good(count = 10000)) - def countExpectations = CountExpectations(good = 10000, bad = 0) -} - -class GenericScenario4 extends AzureTransformerSpecification { - def description = "Input: 10000 bad, config: JSON, compression, windowing: 1 minute" - def requiredAppConfig = AppConfiguration.default - def inputBatches = List(bad(count = 10000)) - def countExpectations = CountExpectations(good = 0, bad = 10000) -} - -// PARQUET + GZIP -class GenericScenario5 extends AzureTransformerSpecification { - def description = "Input: 1 good, config: PARQUET, compression, windowing: 1 minute" - def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) - def inputBatches = List(good(count = 1)) - def countExpectations = CountExpectations(good = 1, bad = 0) -} - -class GenericScenario6 extends AzureTransformerSpecification { - def description = "Input: 1 bad, config: PARQUET, compression, windowing: 1 minute" - def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) - def inputBatches = List(bad(count = 1)) - def countExpectations = CountExpectations(good = 0, bad = 1) -} - -class GenericScenario7 extends AzureTransformerSpecification { - def description = "Input: 10000 good, config: PARQUET, compression, windowing: 1 minute" - def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) - def inputBatches = List(good(count = 10000)) - def countExpectations = CountExpectations(good = 10000, bad = 0) -} - -class GenericScenario8 extends AzureTransformerSpecification { - def description = "Input: 10000 bad, config: PARQUET, compression, windowing: 1 minute" - def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) - def inputBatches = List(bad(count = 10000)) - def countExpectations = CountExpectations(good = 0, bad = 10000) -} - -// Windowing - 2 minutes -class GenericScenario9 extends AzureTransformerSpecification { - def description = """Input: mixed 5000 good and 5000 badconfig: JSON, compression, windowing: 2 minutes""" - def requiredAppConfig = AppConfiguration.default.copy(windowFrequencyMinutes = 2) - def inputBatches = List(good(count = 5000), bad(count = 5000).delayed(2.minutes)) - def countExpectations = CountExpectations(good = 5000, bad = 5000) -} - -// JSON, GZIP disabled -class GenericScenario10 extends AzureTransformerSpecification { - def description = "Input: 1 good, config: JSON, no compression, windowing: 1 minute" - def requiredAppConfig = AppConfiguration.default.copy(compression = Compression.None) - def inputBatches = List(good(count = 1)) - def countExpectations = CountExpectations(good = 1, bad = 0) -} - -// PARQUET, GZIP disabled -class GenericScenario600 extends AzureTransformerSpecification { - def description = "Input: 1 good, config: JSON, no compression, windowing: 1 minute" - def requiredAppConfig = AppConfiguration.default.copy(compression = Compression.None, fileFormat = WideRowFormat.PARQUET) - def inputBatches = List(good(count = 1)) - def countExpectations = CountExpectations(good = 1, bad = 0) -} diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/JsonOutputDetailsScenario.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/json.scala similarity index 77% rename from modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/JsonOutputDetailsScenario.scala rename to modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/json.scala index 8674faee3..fd248d9a2 100644 --- a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/JsonOutputDetailsScenario.scala +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/json.scala @@ -1,6 +1,6 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.scenarios - -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.Content +import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.{Content, bad, good} import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification.CountExpectations import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{ AppConfiguration, @@ -9,6 +9,60 @@ import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experim } import io.circe.parser +import scala.concurrent.duration.DurationInt + +class JsonScenario1 extends AzureTransformerSpecification { + def description = "Input: 1 good, config: JSON, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default + def inputBatches = List(good(count = 1)) + def countExpectations = CountExpectations(good = 1, bad = 0) +} + +class JsonScenario2 extends AzureTransformerSpecification { + def description = "Input: 1 bad, config: JSON, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default + def inputBatches = List(bad(count = 1)) + def countExpectations = CountExpectations(good = 0, bad = 1) +} + +class JsonScenario3 extends AzureTransformerSpecification { + def description = "Input: 10000 good, config: JSON, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default + def inputBatches = List(good(count = 10000)) + def countExpectations = CountExpectations(good = 10000, bad = 0) +} + +class JsonScenario4 extends AzureTransformerSpecification { + def description = "Input: 10000 bad, config: JSON, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default + def inputBatches = List(bad(count = 10000)) + def countExpectations = CountExpectations(good = 0, bad = 10000) +} + +class JsonScenario5 extends AzureTransformerSpecification { + def description = """Input: mixed 5000 good and 5000 bad, config: JSON, compression, windowing: 1 minute""" + def requiredAppConfig = AppConfiguration.default + def inputBatches = List(good(count = 5000), bad(count = 5000)) + def countExpectations = CountExpectations(good = 5000, bad = 5000) +} + +//Changed defualt windowing to 2 minutes +class JsonScenario6 extends AzureTransformerSpecification { + def description = """Input: mixed 5000 good and 5000 bad, config: JSON, compression, windowing: 2 minutes""" + def requiredAppConfig = AppConfiguration.default.copy(windowFrequencyMinutes = 2) + def inputBatches = List(good(count = 5000), good(count = 5000).delayed(2.minutes)) // force new window by delaying second input batch + def countExpectations = CountExpectations(good = 10000, bad = 0) +} + +//No compression +class JsonScenario7 extends AzureTransformerSpecification { + def description = """Input: mixed 5000 good and 5000 bad, config: JSON, no compression, windowing: 1 minute""" + def requiredAppConfig = AppConfiguration.default.copy(compression = Compression.None) + def inputBatches = List(good(count = 5000), bad(count = 5000)) + def countExpectations = CountExpectations(good = 5000, bad = 5000) +} + +//Checking details of JSON output class JsonOutputDetailsScenario extends AzureTransformerSpecification { private val goodEvent = Content.TextLines( diff --git a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/ParquetOutputDetailsScenario.scala b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/parquet.scala similarity index 83% rename from modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/ParquetOutputDetailsScenario.scala rename to modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/parquet.scala index 1abfb3072..3b45153f8 100644 --- a/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/ParquetOutputDetailsScenario.scala +++ b/modules/transformer-kafka/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/kafka/experimental/scenarios/parquet.scala @@ -1,7 +1,8 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.scenarios import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat -import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.Content +import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression +import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.{Content, bad, good} import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification.CountExpectations import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{ AppConfiguration, @@ -10,6 +11,60 @@ import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experim } import io.circe.parser +import scala.concurrent.duration.DurationInt + +class ParquetScenario1 extends AzureTransformerSpecification { + def description = "Input: 1 good, config: PARQUET, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) + def inputBatches = List(good(count = 1)) + def countExpectations = CountExpectations(good = 1, bad = 0) +} + +class ParquetScenario2 extends AzureTransformerSpecification { + def description = "Input: 1 bad, config: PARQUET, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) + def inputBatches = List(bad(count = 1)) + def countExpectations = CountExpectations(good = 0, bad = 1) +} + +class ParquetScenario3 extends AzureTransformerSpecification { + def description = "Input: 10000 good, config: PARQUET, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) + def inputBatches = List(good(count = 10000)) + def countExpectations = CountExpectations(good = 10000, bad = 0) +} + +class ParquetScenario4 extends AzureTransformerSpecification { + def description = "Input: 10000 bad, config: PARQUET, compression, windowing: 1 minute" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) + def inputBatches = List(bad(count = 10000)) + def countExpectations = CountExpectations(good = 0, bad = 10000) +} + +class ParquetScenario5 extends AzureTransformerSpecification { + def description = """Input: mixed 5000 good and 5000 bad, config: PARQUET, compression, windowing: 1 minute""" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET) + def inputBatches = List(good(count = 5000), bad(count = 5000)) + def countExpectations = CountExpectations(good = 5000, bad = 5000) +} + +//Changed defualt windowing to 2 minutes +class ParquetScenario6 extends AzureTransformerSpecification { + def description = """Input: mixed 5000 good and 5000 bad, config: PARQUET, compression, windowing: 2 minutes""" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET, windowFrequencyMinutes = 2) + def inputBatches = List(good(count = 5000), good(count = 5000).delayed(2.minutes)) // force new window by delaying second input batch + def countExpectations = CountExpectations(good = 10000, bad = 0) +} + +//No compression +class ParquetScenario7 extends AzureTransformerSpecification { + def description = """Input: mixed 5000 good and 5000 bad, config: PARQUET, no compression, windowing: 1 minute""" + def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET, compression = Compression.None) + def inputBatches = List(good(count = 5000), bad(count = 5000)) + def countExpectations = CountExpectations(good = 5000, bad = 5000) +} + +//Checking details of parquet output class ParquetOutputDetailsScenario extends AzureTransformerSpecification { private val goodEvent = Content.TextLines(