Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Read splittable LZO effectively in transformer #105

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ lazy val transformer = project
.settings(BuildSettings.dynamoDbSettings)
.settings(
resolvers ++= Seq(
"Sonatype OSS Snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/"
"Sonatype OSS Snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/",
"Maven Twitter" at "https://maven.twttr.com/" // Used for Hadoop LZO
),
libraryDependencies ++= Seq(
Dependencies.hadoop,
Dependencies.hadoopLzo,
Dependencies.spark,
Dependencies.sparkSql,
Dependencies.schemaDdl,
Expand All @@ -61,6 +63,7 @@ lazy val commonDependencies = Seq(
Dependencies.analyticsSdk,
Dependencies.fs2,
Dependencies.decline,
Dependencies.declineEnumeratum,
Dependencies.s3,
Dependencies.dynamodb,
Dependencies.enumeratum,
Expand Down
34 changes: 25 additions & 9 deletions core/src/main/scala/com.snowplowanalytics.snowflake.core/Cli.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import java.util.Base64
import cats.data.{EitherT, ValidatedNel}
import cats.effect.IO
import cats.implicits._

import io.circe.Json
import io.circe.syntax._
import io.circe.parser.{parse => jsonParse}

import com.monovore.decline.{ Opts, Command, Argument }

import com.monovore.decline.{Argument, Command, Opts}
import com.monovore.decline.enumeratum._
import com.snowplowanalytics.iglu.client.Client
import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifestConfig
import enumeratum._

import scala.collection.immutable

object Cli {
import Config._
Expand Down Expand Up @@ -86,16 +86,28 @@ object Cli {
def defaultMetavar: String = "base64"
}

sealed trait CompressionFormat extends EnumEntry with EnumEntry.Lowercase
object CompressionFormat extends Enum[CompressionFormat] {
case object Gzip extends CompressionFormat
case object LZO extends CompressionFormat
case object Snappy extends CompressionFormat
case object None extends CompressionFormat

val values: immutable.IndexedSeq[CompressionFormat] = findValues
}

case class Transformer(loaderConfig: Config,
igluClient: Client[IO, Json],
inbatch: Boolean,
eventsManifestConfig: Option[EventsManifestConfig])
eventsManifestConfig: Option[EventsManifestConfig],
inputCompressionFormat: Option[CompressionFormat])

object Transformer {
case class Raw(loaderConfig: Base64Encoded,
resolver: Base64Encoded,
inbatch: Boolean,
eventsManifestConfig: Option[Base64Encoded])
eventsManifestConfig: Option[Base64Encoded],
inputCompressionFormat: Option[CompressionFormat])

def parse(args: Seq[String]): EitherT[IO, String, Transformer] =
transformer
Expand All @@ -113,7 +125,7 @@ object Cli {
case Some(json) => EventsManifestConfig.parseJson[IO](igluClient, json.json).map(_.some)
case None => EitherT.rightT[IO, String](none[EventsManifestConfig])
}
} yield Transformer(cfg, igluClient, raw.inbatch, manifest)
} yield Transformer(cfg, igluClient, raw.inbatch, manifest, raw.inputCompressionFormat)

}

Expand Down Expand Up @@ -194,11 +206,15 @@ object Cli {

val inBatchDedupe = Opts.flag("inbatch-deduplication", "Enable in-batch natural deduplication").orFalse
val evantsManifest = Opts.option[Base64Encoded]("events-manifest", "Snowplow Events Manifest JSON config, to enable cross-batch deduplication, base64-encoded").orNone
val inputCompressionFormat = Opts.option[CompressionFormat](
"input-compression-format",
"The compression used by input files of the Spark job. Supported values: none, gzip, lzo, snappy."
).orNone

val loader = Command("snowplow-snowflake-loader", "Snowplow Database orchestrator")(load.orElse(setup).orElse(migrate))

val transformer = Command("snowplow-snowflake-transformer", "Spark job to transform enriched data to Snowflake-compatible format") {
(configEncoded, resolverEncoded, inBatchDedupe, evantsManifest).mapN(Transformer.Raw)
(configEncoded, resolverEncoded, inBatchDedupe, evantsManifest, inputCompressionFormat).mapN(Transformer.Raw)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.snowplowanalytics.iglu.client.resolver.registries.Registry
import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifestConfig
import com.snowplowanalytics.snowflake.core.Config.SetupSteps
import com.snowplowanalytics.snowflake.core.Config.S3Folder.{coerce => s3}
import com.snowplowanalytics.snowflake.core.Cli.CompressionFormat


class ConfigSpec extends Specification {
Expand Down Expand Up @@ -415,7 +416,9 @@ class ConfigSpec extends Specification {
"--inbatch-deduplication",
"--resolver", resolverBase64,
"--config", encodeToBase64(config),
"--events-manifest", encodeToBase64(eventManifestConfig)).toArray
"--events-manifest", encodeToBase64(eventManifestConfig),
"--input-compression-format", "gzip"
).toArray

val expected = Cli.Transformer(
Config(
Expand Down Expand Up @@ -449,11 +452,12 @@ class ConfigSpec extends Specification {
),
awsRegion = "us-west-1",
dynamodbTable = "snowplow-integration-test-crossbatch-dedupe"
))
)),
Some(CompressionFormat.Gzip)
)

Cli.Transformer.parse(args).value.unsafeRunSync() must beRight.like {
case transformer @ Cli.Transformer(_, client, _, _) =>
case transformer @ Cli.Transformer(_, client, _, _, _) =>
val updatedClient: Resolver[IO] = client.resolver.copy(cache = None)
val updatedConfig = transformer.copy(igluClient = transformer.igluClient.copy(resolver = updatedClient))
updatedConfig must beEqualTo(expected)
Expand Down
3 changes: 3 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ object Dependencies {
object V {
// Java
val hadoop = "2.8.5"
val hadoopLzo = "0.4.20"
val snowflakeJdbc = "3.11.0"
val aws = "1.11.209"
// Scala
Expand All @@ -37,6 +38,7 @@ object Dependencies {

// Java
val hadoop = "org.apache.hadoop" % "hadoop-aws" % V.hadoop % Provided
val hadoopLzo = "com.hadoop.gplcompression" % "hadoop-lzo" % V.hadoopLzo % Provided
val snowflakeJdbc = "net.snowflake" % "snowflake-jdbc" % V.snowflakeJdbc
val s3 = "com.amazonaws" % "aws-java-sdk-s3" % V.aws
val dynamodb = "com.amazonaws" % "aws-java-sdk-dynamodb" % V.aws
Expand All @@ -48,6 +50,7 @@ object Dependencies {
val sparkSql = "org.apache.spark" %% "spark-sql" % V.spark % Provided
val fs2 = "co.fs2" %% "fs2-core" % V.fs2
val decline = "com.monovore" %% "decline" % V.decline
val declineEnumeratum= "com.monovore" %% "decline-enumeratum" % V.decline
val analyticsSdk = "com.snowplowanalytics" %% "snowplow-scala-analytics-sdk" % V.analyticsSdk
val enumeratum = "com.beachape" %% "enumeratum" % V.enumeratum
val igluClient = ("com.snowplowanalytics" %% "iglu-scala-client" % V.igluClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object Main extends IOApp {

def run(args: List[String]): IO[ExitCode] = {
Cli.Transformer.parse(args).value.flatMap {
case Right(Cli.Transformer(appConfig, igluClient, inbatch, eventsManifestConfig)) =>
case Right(Cli.Transformer(appConfig, igluClient, inbatch, eventsManifestConfig, inputCompressionFormat)) =>

// Always use EMR Role role for manifest-access
for {
Expand All @@ -61,7 +61,9 @@ object Main extends IOApp {
exitCode <- runFolders match {
case Right(folders) =>
val configs = folders.map(S3Config(appConfig.input, appConfig.stageUrl, appConfig.badOutputUrl, _))
TransformerJob.run(spark, manifest, appConfig.manifest, configs, eventsManifestConfig, inbatch, atomic).as(ExitCode.Success)
TransformerJob.run(spark, manifest, appConfig.manifest, configs, eventsManifestConfig, inbatch, atomic,
inputCompressionFormat
).as(ExitCode.Success)
case Left(error) =>
die(s"Cannot get list of unprocessed folders\n$error")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingData}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event

import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifestConfig

import com.snowplowanalytics.snowflake.core.Cli.CompressionFormat
import com.snowplowanalytics.snowflake.core.ProcessManifest
import com.snowplowanalytics.snowflake.transformer.singleton.EventsManifestSingleton

Expand Down Expand Up @@ -62,12 +65,12 @@ object TransformerJob {
)

/** Process all directories, saving state into DynamoDB */
def run(spark: SparkSession, manifest: ProcessManifest[IO], tableName: String, jobConfigs: List[TransformerJobConfig], eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean, atomicSchema: Schema): IO[Unit] =
def run(spark: SparkSession, manifest: ProcessManifest[IO], tableName: String, jobConfigs: List[TransformerJobConfig], eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean, atomicSchema: Schema, inputCompressionFormat: Option[CompressionFormat]): IO[Unit] =
jobConfigs.traverse_ { jobConfig =>
for {
_ <- IO(System.out.println(s"Snowflake Transformer: processing ${jobConfig.runId}. ${System.currentTimeMillis()}"))
_ <- manifest.add(tableName, jobConfig.runId)
shredTypes <- IO(process(spark, jobConfig, eventsManifestConfig, inbatch, atomicSchema))
shredTypes <- IO(process(spark, jobConfig, eventsManifestConfig, inbatch, atomicSchema, inputCompressionFormat))
_ <- manifest.markProcessed(tableName, jobConfig.runId, shredTypes, jobConfig.goodOutput)
_ <- IO(System.out.println(s"Snowflake Transformer: processed ${jobConfig.runId}. ${System.currentTimeMillis()}"))
} yield ()
Expand All @@ -84,7 +87,7 @@ object TransformerJob {
* @param atomicSchema map of field names to maximum lengths
* @return list of discovered shredded types
*/
def process(spark: SparkSession, jobConfig: TransformerJobConfig, eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean, atomicSchema: Schema) = {
def process(spark: SparkSession, jobConfig: TransformerJobConfig, eventsManifestConfig: Option[EventsManifestConfig], inbatch: Boolean, atomicSchema: Schema, inputCompressionFormat: Option[CompressionFormat]) = {
import spark.implicits._

// Decide whether bad rows will be stored or not
Expand All @@ -96,8 +99,20 @@ object TransformerJob {
val keysAggregator = new StringSetAccumulator
sc.register(keysAggregator)

val inputRDD = sc
.textFile(jobConfig.input)
val linesRDD = inputCompressionFormat match {
case Some(CompressionFormat.LZO) =>
sc.newAPIHadoopFile(
jobConfig.input,
classOf[com.hadoop.mapreduce.LzoTextInputFormat],
classOf[org.apache.hadoop.io.LongWritable],
classOf[org.apache.hadoop.io.Text]
).map(_._2.toString)

case _ =>
sc.textFile(jobConfig.input)
}

val inputRDD = linesRDD
.map { line =>
for {
event <- Transformer.jsonify(line)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import cats.syntax.either._
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._
import com.snowplowanalytics.iglu.client.Resolver

import com.snowplowanalytics.snowflake.core.{ Cli, idClock }
import com.snowplowanalytics.snowflake.core.Cli.CompressionFormat
import com.snowplowanalytics.snowflake.core.{Cli, idClock}
import com.snowplowanalytics.snowflake.transformer.TransformerJobConfig.FSConfig
import com.snowplowanalytics.snowplow.eventsmanifest.EventsManifestConfig

Expand Down Expand Up @@ -238,7 +238,7 @@ trait TransformerJobSpec extends Specification with BeforeAfterAll {
val badOutput = if (badRowsShouldBeStored) Some(dirs.badRows.toString) else None
val eventManifestConfig = if (crossBatchDedup) Some(duplicateStorageConfig) else None
val config = FSConfig(input.toString, dirs.output.toString, badOutput)
TransformerJob.process(spark, config, eventManifestConfig, inBatchDedup, atomic)
TransformerJob.process(spark, config, eventManifestConfig, inBatchDedup, atomic, Some(CompressionFormat.None))
deleteRecursively(input)
}
override def afterAll(): Unit = {
Expand Down