Skip to content

Commit

Permalink
Fix reading output rows + group scenarios by json/parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jul 10, 2023
1 parent ef87621 commit 64b5ec2
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,13 @@ abstract class TransformerSpecification extends Specification with AppDependenci
.through(decompressIfNeeded(blob.key))
.compile
.to(Array)
.flatMap(convertBlobToListOfRows)
.flatMap(convertBlobToListOfRows(blob.key))

private def convertBlobToListOfRows(blob: Blob): IO[List[DataRow]] =
requiredAppConfig.fileFormat match {
case WideRowFormat.JSON => OutputDataRowReader.fromJson(blob)
case WideRowFormat.PARQUET => OutputDataRowReader.fromParquet(blob)
}
private def convertBlobToListOfRows(blobKey: BlobStorage.Key)(blob: Blob): IO[List[DataRow]] =
if (isBlobGoodParquetData(blobKey))
OutputDataRowReader.fromParquet(blob)
else
OutputDataRowReader.fromJson(blob)

// Decompress only for:
// - JSON good/bad output
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.scenarios

import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.Content
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.{Content, bad, good}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification.CountExpectations
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{
AppConfiguration,
Expand All @@ -9,6 +9,60 @@ import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experim
}
import io.circe.parser

import scala.concurrent.duration.DurationInt

class JsonScenario1 extends AzureTransformerSpecification {
def description = "Input: 1 good, config: JSON, compression, windowing: 1 minute"
def requiredAppConfig = AppConfiguration.default
def inputBatches = List(good(count = 1))
def countExpectations = CountExpectations(good = 1, bad = 0)
}

class JsonScenario2 extends AzureTransformerSpecification {
def description = "Input: 1 bad, config: JSON, compression, windowing: 1 minute"
def requiredAppConfig = AppConfiguration.default
def inputBatches = List(bad(count = 1))
def countExpectations = CountExpectations(good = 0, bad = 1)
}

class JsonScenario3 extends AzureTransformerSpecification {
def description = "Input: 10000 good, config: JSON, compression, windowing: 1 minute"
def requiredAppConfig = AppConfiguration.default
def inputBatches = List(good(count = 10000))
def countExpectations = CountExpectations(good = 10000, bad = 0)
}

class JsonScenario4 extends AzureTransformerSpecification {
def description = "Input: 10000 bad, config: JSON, compression, windowing: 1 minute"
def requiredAppConfig = AppConfiguration.default
def inputBatches = List(bad(count = 10000))
def countExpectations = CountExpectations(good = 0, bad = 10000)
}

class JsonScenario5 extends AzureTransformerSpecification {
def description = """Input: mixed 5000 good and 5000 bad, config: JSON, compression, windowing: 1 minute"""
def requiredAppConfig = AppConfiguration.default
def inputBatches = List(good(count = 5000), bad(count = 5000))
def countExpectations = CountExpectations(good = 5000, bad = 5000)
}

//Changed defualt windowing to 2 minutes
class JsonScenario6 extends AzureTransformerSpecification {
def description = """Input: mixed 5000 good and 5000 bad, config: JSON, compression, windowing: 2 minutes"""
def requiredAppConfig = AppConfiguration.default.copy(windowFrequencyMinutes = 2)
def inputBatches = List(good(count = 5000), good(count = 5000).delayed(2.minutes)) // force new window by delaying second input batch
def countExpectations = CountExpectations(good = 10000, bad = 0)
}

//No compression
class JsonScenario7 extends AzureTransformerSpecification {
def description = """Input: mixed 5000 good and 5000 bad, config: JSON, no compression, windowing: 1 minute"""
def requiredAppConfig = AppConfiguration.default.copy(compression = Compression.None)
def inputBatches = List(good(count = 5000), bad(count = 5000))
def countExpectations = CountExpectations(good = 5000, bad = 5000)
}

//Checking details of JSON output
class JsonOutputDetailsScenario extends AzureTransformerSpecification {

private val goodEvent = Content.TextLines(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.scenarios

import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.Content
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.InputBatch.{Content, bad, good}
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.TransformerSpecification.CountExpectations
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experimental.{
AppConfiguration,
Expand All @@ -10,6 +11,60 @@ import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.kafka.experim
}
import io.circe.parser

import scala.concurrent.duration.DurationInt

class ParquetScenario1 extends AzureTransformerSpecification {
def description = "Input: 1 good, config: PARQUET, compression, windowing: 1 minute"
def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET)
def inputBatches = List(good(count = 1))
def countExpectations = CountExpectations(good = 1, bad = 0)
}

class ParquetScenario2 extends AzureTransformerSpecification {
def description = "Input: 1 bad, config: PARQUET, compression, windowing: 1 minute"
def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET)
def inputBatches = List(bad(count = 1))
def countExpectations = CountExpectations(good = 0, bad = 1)
}

class ParquetScenario3 extends AzureTransformerSpecification {
def description = "Input: 10000 good, config: PARQUET, compression, windowing: 1 minute"
def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET)
def inputBatches = List(good(count = 10000))
def countExpectations = CountExpectations(good = 10000, bad = 0)
}

class ParquetScenario4 extends AzureTransformerSpecification {
def description = "Input: 10000 bad, config: PARQUET, compression, windowing: 1 minute"
def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET)
def inputBatches = List(bad(count = 10000))
def countExpectations = CountExpectations(good = 0, bad = 10000)
}

class ParquetScenario5 extends AzureTransformerSpecification {
def description = """Input: mixed 5000 good and 5000 bad, config: PARQUET, compression, windowing: 1 minute"""
def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET)
def inputBatches = List(good(count = 5000), bad(count = 5000))
def countExpectations = CountExpectations(good = 5000, bad = 5000)
}

//Changed defualt windowing to 2 minutes
class ParquetScenario6 extends AzureTransformerSpecification {
def description = """Input: mixed 5000 good and 5000 bad, config: PARQUET, compression, windowing: 2 minutes"""
def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET, windowFrequencyMinutes = 2)
def inputBatches = List(good(count = 5000), good(count = 5000).delayed(2.minutes)) // force new window by delaying second input batch
def countExpectations = CountExpectations(good = 10000, bad = 0)
}

//No compression
class ParquetScenario7 extends AzureTransformerSpecification {
def description = """Input: mixed 5000 good and 5000 bad, config: PARQUET, no compression, windowing: 1 minute"""
def requiredAppConfig = AppConfiguration.default.copy(fileFormat = WideRowFormat.PARQUET, compression = Compression.None)
def inputBatches = List(good(count = 5000), bad(count = 5000))
def countExpectations = CountExpectations(good = 5000, bad = 5000)
}

//Checking details of parquet output
class ParquetOutputDetailsScenario extends AzureTransformerSpecification {

private val goodEvent = Content.TextLines(
Expand Down

0 comments on commit 64b5ec2

Please sign in to comment.