Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to partition TSV by app id #288

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class KinesisS3Emitter(client: AmazonS3,
serializer: ISerializer)
extends IEmitter[Result] {

private val partitionTsvByApp = output.s3.partitionForPurpose(purpose).exists(_.contains("{app}"))

/**
* Reads items from a buffer and saves them to s3.
*
Expand All @@ -67,9 +69,9 @@ class KinesisS3Emitter(client: AmazonS3,

val records = buffer.getRecords.asScala.toList
val partitionedBatch =
Common.partition(purpose, monitoring.isStatsDEnabled, records)
Common.partition(purpose, partitionTsvByApp, monitoring.isStatsDEnabled, records)

val getBase: Option[RowType.SelfDescribing] => String =
val getBase: Option[RowType] => String =
getBaseFilename(output.s3, purpose, buffer.getFirstSequenceNumber, buffer.getLastSequenceNumber, LocalDateTime.now)
val afterEmit: () => Unit =
() => monitoring.report(partitionedBatch.meta)
Expand All @@ -78,7 +80,7 @@ class KinesisS3Emitter(client: AmazonS3,
case (RowType.Unpartitioned, partitionRecords) if partitionRecords.nonEmpty =>
emitRecords(partitionRecords, afterEmit, getBase(None))
.map(_.asLeft)
case (data: RowType.SelfDescribing, partitionRecords) if partitionRecords.nonEmpty =>
case (data @ (_: RowType.SelfDescribing | _: RowType.Tsv), partitionRecords) if partitionRecords.nonEmpty =>
emitRecords(partitionRecords, afterEmit, getBase(Some(data))).map(_.asLeft)
case _ =>
records // ReadingError or empty partition - should be handled later by serializer
Expand Down Expand Up @@ -238,14 +240,17 @@ object KinesisS3Emitter {
lastSeq: String,
now: LocalDateTime
)(
sdj: Option[RowType.SelfDescribing]
row: Option[RowType]
): String = {
val sdj = row.collect { case s: RowType.SelfDescribing => s }
val app = row.collect { case a: RowType.Tsv => a }
val partitionPath = s3Config.partitionForPurpose(purpose).map {
_.template("vendor", sdj.fold("unknown")(_.vendor))
.template("name", sdj.fold("unknown")(_.name))
.template("schema", sdj.fold("unknown")(_.name)) // allowed synonym
.template("format", sdj.fold("unknown")(_.format))
.template("model", sdj.fold(-1)(_.model).toString)
.template("app", app.fold("unknown")(_.appId))
.template("yy+", now.format(DateTimeFormatter.ofPattern("yyyy")))
.template("mm", now.format(DateTimeFormatter.ofPattern("MM")))
.template("dd", now.format(DateTimeFormatter.ofPattern("dd")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ package object loader {
* Final result of S3 Loader processing
*/
type Result = Either[GenericError, RawRecord]

/**
* The result of S3 Loader processing with a potentially parsed record
*/
type ParsedResult = Either[GenericError, (RawRecord, Option[Array[String]])]
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
package com.snowplowanalytics.s3.loader.processing

import java.time.Instant
import java.nio.charset.StandardCharsets.UTF_8

import com.snowplowanalytics.s3.loader.Result
import com.snowplowanalytics.s3.loader.{ParsedResult, Result}
import com.snowplowanalytics.s3.loader.processing.Batch.Meta

/** Content of a KCL buffer with metadata attached */
Expand All @@ -34,20 +32,19 @@ object Batch {

val EmptyMeta: Meta = Meta(None, 0)

def fromEnriched(inputs: List[Result]): Batch[List[Result]] = {
def fromEnriched(inputs: List[ParsedResult]): Batch[List[ParsedResult]] = {
val meta = inputs.foldLeft(EmptyMeta) {
case (Meta(tstamp, count), Left(_)) =>
Meta(tstamp, count + 1)
case (Meta(tstamp, count), Right(raw)) =>
val strRecord = new String(raw, UTF_8)
val extracted = Common.getTstamp(strRecord).toOption
case (Meta(tstamp, count), Right((_, array))) =>
val extracted = array.flatMap(Common.getTstamp(_).toOption)
val min = Common.compareTstamps(tstamp, extracted)
Meta(min, count + 1)
}

Batch(meta, inputs)
}

def from(inputs: List[Result]): Batch[List[Result]] =
def from[R](inputs: List[R]): Batch[List[R]] =
Batch(EmptyMeta, inputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@ package com.snowplowanalytics.s3.loader.processing

import java.time.Instant
import java.nio.charset.StandardCharsets.UTF_8

import cats.syntax.either._

import io.circe.parser.parse

import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.core.circe.implicits._

import com.snowplowanalytics.s3.loader.Result
import com.snowplowanalytics.s3.loader.{ParsedResult, Result}
import com.snowplowanalytics.s3.loader.Config.Purpose
import com.snowplowanalytics.s3.loader.monitoring.StatsD.CollectorTstampIdx

Expand All @@ -40,14 +36,29 @@ object Common {
*/
def partition(
purpose: Purpose,
partitionTsvByApp: Boolean,
statsDEnabled: Boolean,
records: List[Result]
): Batch.Partitioned =
purpose match {
case Purpose.SelfDescribingJson =>
Batch.from(records).map(rs => partitionByType(rs).toList)
case Purpose.Enriched if statsDEnabled =>
Batch.fromEnriched(records).map(rs => List((RowType.Unpartitioned, rs)))
case Purpose.Enriched =>
// We need to parse the record from bytes to Array[String] to obtain time stats (for StatsD),
// as well as for partitioning by app id
val parsed = records.map(toParsedRecord(_, actuallyParse = statsDEnabled || partitionTsvByApp))
val batch =
if (statsDEnabled)
Batch.fromEnriched(parsed)
else
Batch.from(parsed)
if (partitionTsvByApp)
batch.map(rs =>
partitionByApp(rs).toList.map { case (row, records) =>
(row, records.map(fromParsedRecord))
})
else
batch.map(rs => List((RowType.Unpartitioned, rs.map(fromParsedRecord))))
case _ =>
Batch.from(records).map(rs => List((RowType.Unpartitioned, rs)))
}
Expand All @@ -70,16 +81,31 @@ object Common {
case Left(_) => RowType.ReadingError
}

def toParsedRecord(record: Result, actuallyParse: Boolean): ParsedResult =
record.map { byteArray =>
val parsed = if (actuallyParse) Some(new String(byteArray, UTF_8).split("\t", -1)) else None
(byteArray, parsed)
}

def fromParsedRecord(record: ParsedResult): Result = record.map(_._1)

def partitionByApp(records: List[ParsedResult]): Map[RowType, List[ParsedResult]] =
records.groupBy {
case Right((_, array)) =>
// if there are no tabs, avoid returning the whole string
val appId = array.flatMap(_.headOption.filter(_.size > 1))
appId.fold[RowType](RowType.Unpartitioned)(RowType.Tsv)
case Left(_) => RowType.ReadingError
}

/** Extract a timestamp from enriched TSV line */
def getTstamp(row: String): Either[RuntimeException, Instant] = {
val array = row.split("\t", -1)
def getTstamp(array: Array[String]): Either[RuntimeException, Instant] =
for {
string <- Either
.catchOnly[IndexOutOfBoundsException](array(CollectorTstampIdx))
.map(_.replaceAll(" ", "T") + "Z")
tstamp <- Either.catchOnly[DateTimeParseException](Instant.parse(string))
} yield tstamp
}

def compareTstamps(a: Option[Instant], b: Option[Instant]): Option[Instant] =
(a, b) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ object RowType {
*/
case object Unpartitioned extends RowType

/** TSV line with payload that can be partitioned */
final case class Tsv(appId: String) extends RowType

/** JSON line with self-describing payload that can be partitioned */
final case class SelfDescribing(vendor: String, name: String, format: String, model: Int) extends RowType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,12 @@ package com.snowplowanalytics.s3.loader.processing

import java.time.Instant
import java.util.UUID

import cats.data.NonEmptyList
import cats.syntax.either._

import com.snowplowanalytics.snowplow.badrows.BadRow.GenericError
import com.snowplowanalytics.snowplow.badrows.Failure.GenericFailure
import com.snowplowanalytics.snowplow.badrows.Payload.RawPayload

import com.snowplowanalytics.s3.loader.{Result, S3Loader}

import com.snowplowanalytics.s3.loader.{ParsedResult, Result, S3Loader}
import org.specs2.mutable.Specification

class BatchSpec extends Specification {
Expand All @@ -37,20 +33,22 @@ class BatchSpec extends Specification {

"fromEnriched" should {
"extract the earliest timestamp" in {
val input: List[Result] = List(
val input: List[ParsedResult] = List(
BatchSpec.getEvent("2020-11-26 00:02:05"),
BatchSpec.getEvent("2020-11-26 00:01:05"),
BatchSpec.getEvent("2020-11-26 00:03:05")
).map(_.getBytes.asRight)
).map(_.getBytes.asRight).map(Common.toParsedRecord(_, actuallyParse = true))

val expected = Batch.Meta(Some(Instant.parse("2020-11-26T00:01:05Z")), 3)

Batch.fromEnriched(input).meta must beEqualTo(expected)
}

"ignore invalid TSVs for timestamps, but preserve for count" in {
val input: List[Result] =
List("invalid event", "rubbish").map(_.getBytes.asRight)
val input: List[ParsedResult] =
List("invalid event", "rubbish")
.map(_.getBytes.asRight)
.map(Common.toParsedRecord(_, actuallyParse = true))

val expected = Batch(Batch.Meta(None, 2), input)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class CommonSpec extends Specification {

"getTimestamp" should {
"parse timestamp in proper format" in {
val input = List.fill(4)("2020-11-26 00:01:05").mkString("\t")
val input = List.fill(4)("2020-11-26 00:01:05").toArray
val expected = Instant.parse("2020-11-26T00:01:05Z")
Common.getTstamp(input) must beRight(expected)
}
Expand All @@ -79,13 +79,13 @@ class CommonSpec extends Specification {
"partition" should {
"add metadata for enriched if statsd is enabled" in {
val input = List("".getBytes.asRight)
val result = Common.partition(Config.Purpose.Enriched, true, input)
val result = Common.partition(Config.Purpose.Enriched, false, true, input)
result.meta should beEqualTo(Batch.Meta(None, 1))
}

"not add metadata for enriched if statsd is disabled" in {
val input = List("".getBytes.asRight)
val result = Common.partition(Config.Purpose.Enriched, false, input)
val result = Common.partition(Config.Purpose.Enriched, false, false, input)
result.meta should beEqualTo(Batch.EmptyMeta)
}

Expand All @@ -94,7 +94,7 @@ class CommonSpec extends Specification {

val input = List(dataType11.asRight, dataType21.asRight)
val result =
Common.partition(Config.Purpose.SelfDescribingJson, false, input)
Common.partition(Config.Purpose.SelfDescribingJson, false, false, input)
result should beEqualTo(
Batch(
Batch.EmptyMeta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class GZipSerializerSpec extends Specification {
cleanup()

val binaryInputs = List(
(List("A", "B", 1000, "a", "b"):List[Any]).mkString("\t").getBytes.asRight,
(List("X", "Y", 2000, "x", "y"):List[Any]).mkString("\t").getBytes.asRight
(List("A", "B", 1000, "a", "b"): List[Any]).mkString("\t").getBytes.asRight,
(List("X", "Y", 2000, "x", "y"): List[Any]).mkString("\t").getBytes.asRight
)

val serializationResult =
Expand Down
Loading