Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
Avoid loading whole DynamoDB manifest into memory (close #116)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Sep 7, 2020
1 parent 0e8bfb7 commit ea8f1cd
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ import com.snowplowanalytics.snowflake.generated.ProjectMetadata
trait ProcessManifest[F[_]] {
// Loader-specific functions
def markLoaded(tableName: String, runId: String): F[Unit]
def scan(tableName: String): F[Either[String, List[RunId]]]
def scan(tableName: String): Stream[F, RunId]

// Transformer-specific functions
def add(tableName: String, runId: String): F[Unit]
def markProcessed(tableName: String, runId: String, shredTypes: List[String], outputPath: String): F[Unit]
def getUnprocessed(manifestTable: String, enrichedInput: S3Folder): F[Either[String, List[String]]]
def getUnprocessed(manifestTable: String, enrichedInput: S3Folder): F[List[String]]
}

/**
Expand Down Expand Up @@ -133,10 +133,6 @@ object ProcessManifest {
Sync[F].raiseError(new IllegalStateException(s"Calling destructive add method in DryRun (table $tableName, runId: $runId)"))
}

/** Check if set of run ids contains particular folder */
def contains(state: List[RunId], folder: String): Boolean =
state.map(folder => folder.runId).contains(folder)

/** Common implementation */
private class AwsManifest[F[_]: Sync: Clock](state: AppState[F]) extends ProcessManifest[F] {
def markLoaded(tableName: String, runId: String): F[Unit] =
Expand All @@ -153,7 +149,7 @@ object ProcessManifest {
_ <- runDynamoDbQuery[F, Unit](state, query)
} yield ()

def scan(tableName: String): F[Either[String, List[RunId]]] = {
def scan(tableName: String): Stream[F, RunId] = {
def getRequest =
Sync[F].delay(new ScanRequest().withTableName(tableName))

Expand Down Expand Up @@ -183,14 +179,12 @@ object ProcessManifest {
Stream.empty
}

val stream = runStream(runRequest)
runStream(runRequest)
.flatMap { result => Stream.emits(result.getItems.asScala.toList) }
.flatMap { item => RunId.parse(item.asScala) match {
case Right(runId) => Stream.emit(runId)
case Left(error) => Stream.raiseError[F](new RuntimeException(error))
}}

stream.compile.toList.attempt.map { either => either.leftMap(_.getMessage) }
}

def add(tableName: String, runId: String): F[Unit] =
Expand Down Expand Up @@ -226,16 +220,14 @@ object ProcessManifest {
_ <- runDynamoDbQuery[F, Unit](state, query)
} yield ()

def getUnprocessed(manifestTable: String, enrichedInput: S3Folder): F[Either[String, List[String]]] =
def getUnprocessed(manifestTable: String, enrichedInput: S3Folder): F[List[String]] =
for {
s3 <- state.get.map(_.s3)
allRuns <- Sync[F].delay(RunManifests.listRunIds(s3, enrichedInput.path))
result <- scan(manifestTable).map {
case Right(state) => Right(allRuns.filterNot(run => contains(state, run)))
case Left(error) => Left(error)
result <- scan(manifestTable).compile.fold(allRuns.toSet) { (s3Runs, manifestRun) =>
s3Runs - manifestRun.runId
}
} yield result

} yield result.toList

// Conversions should not be necessary as realTime returns TZ-independent timestamp
private def getUtcSeconds: F[Int] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,11 @@ class ProcessManifestSpec extends Specification {
runIds <- IO(itemsGen.sample.getOrElse(throw new RuntimeException("Couldn't generate run ids")))
amount <- runIds.traverse(ProcessManifestSpec.addRunId[IO](manifest)).map(_.length)
_ <- IO(println(s"$amount of run ids inserted into a table"))
scanned <- manifest.scan(ProcessManifestSpec.LocalTable)
scanned <- manifest.scan(ProcessManifestSpec.LocalTable).compile.toList

cleanedUpScanned = scanned.map(_.map(ProcessManifestSpec.hideDates))
cleanedUpScanned = scanned.map(ProcessManifestSpec.hideDates)
cleanedUpGenerated = runIds.map(ProcessManifestSpec.hideDates)
} yield cleanedUpScanned must beRight.like {
case scannedRunIds => scannedRunIds must containTheSameElementsAs(cleanedUpGenerated)
}
} yield cleanedUpScanned must containTheSameElementsAs(cleanedUpGenerated)

test.unsafeRunSync()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ object Loader {

val action: Action[F, Unit] = for {
_ <- preliminaryChecks(connection, config)
state <- ProcessManifest[F].scan(config.manifest).toAction.map(SnowflakeState.getState)
runIds = ProcessManifest[F].scan(config.manifest)
state <- SnowflakeState.getState[F](runIds).toAction
_ <- EitherT.fromEither[F](checkFoldersStage(state.foldersToLoad, config.stageUrl))
_ <- initWarehouse.toAction
_ <- state.foldersToLoad.traverse_(loadFolder[F](connection, config))
Expand Down Expand Up @@ -314,9 +315,4 @@ object Loader {
def toAction: Action[F, A] =
value.attemptT.leftMap(e => Error.Runtime(e): Error)
}

private implicit class FEitherOps[F[_]: Functor, A](value: F[Either[String, A]]) {
def toAction: Action[F, A] =
EitherT(value.map(e => e.leftMap(s => Error.Runtime(new RuntimeException(s)): Error)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,60 +12,53 @@
*/
package com.snowplowanalytics.snowflake.loader

import com.snowplowanalytics.snowflake.core.RunId
import com.snowplowanalytics.snowflake.core.RunId._
import com.snowplowanalytics.snowflake.loader.SnowflakeState._
import cats.syntax.all._
import cats.effect.Sync

import fs2.Stream

import org.joda.time.DateTime

/**
* Class representing current consistent state snapshot of Snowplow Snowflake Data.
* Folders (run ids) refer to original archived folders in `enriched.archive`,
* not folders produced by transformer
* @param processed ordered list of processed folders
* @param loaded ordered list of successfully loaded folders
*/
case class SnowflakeState(processed: List[ProcessedRunId], loaded: List[LoadedRunId]) {
import com.snowplowanalytics.snowflake.core.RunId
import com.snowplowanalytics.snowflake.loader.SnowflakeState._

/** Columns that were already added by the time of state snapshot */
val existingColumns = loaded.flatMap(_.shredTypes).toSet
case class SnowflakeState(foldersToLoad: List[FolderToLoad]) extends AnyVal

object SnowflakeState {

private val empty: (List[FolderToLoad], Set[String]) =
(List.empty, existingColumns)
private val init: (List[RunId.ProcessedRunId], Set[String]) = (List.empty, Set.empty)

/**
* Calculate list of folders that have to be loaded in particular order,
* with each having set of columns that appeared **first** in this folder
*/
def foldersToLoad: List[FolderToLoad] = {
val (toLoad, _) = processed.foldLeft(empty) { case ((loadStates, shredTypes), cur) =>
val newColumns = cur.shredTypes.toSet -- shredTypes
val loadState = FolderToLoad(cur, newColumns)
(loadState :: loadStates, newColumns ++ shredTypes)
def getState[F[_]: Sync](runIds: Stream[F, RunId]): F[SnowflakeState] = {
val foldersAndTypes = runIds.compile.fold(init) { case ((folders, loadedTypes), id) =>
if (id.toSkip) (folders, loadedTypes)
else id match {
case loaded: RunId.LoadedRunId => (folders, loaded.shredTypes.toSet ++ loadedTypes)
case processed: RunId.ProcessedRunId => (processed :: folders, loadedTypes)
case _ => (folders, loadedTypes)
}
}
toLoad.reverse

foldersAndTypes.map(foldState.tupled)
}
}

object SnowflakeState {
val foldState: (List[RunId.ProcessedRunId], Set[String]) => SnowflakeState = (folders, existingTypes) => {
val init = List.empty[FolderToLoad]
val foldersToLoad = folders.sortBy(_.addedAt).foldLeft(init) { case (folders, folder) =>
val folderTypes = folder.shredTypes.toSet -- (folders.flatMap(_.folderToLoad.shredTypes).toSet ++ existingTypes)
FolderToLoad(folder, folderTypes) :: folders
}
SnowflakeState(foldersToLoad.reverse)
}

/**
* Folder that was processed by Transformer and ready to be loaded into Snowflake,
* and containing set of columns that first appeared in this folder (according to
* manifest state)
* @param folderToLoad reference to folder processed by Transformer
* @param newColumns set of columns this processed folder brings
*/
case class FolderToLoad(folderToLoad: ProcessedRunId, newColumns: Set[String])
/**
* Folder that was processed by Transformer and ready to be loaded into Snowflake,
* and containing set of columns that first appeared in this folder (according to
* manifest state)
* @param folderToLoad reference to folder processed by Transformer
* @param newColumns set of columns this processed folder brings
*/
case class FolderToLoad(folderToLoad: RunId.ProcessedRunId, newColumns: Set[String])

implicit def dateTimeOrdering: Ordering[DateTime] =
Ordering.fromLessThan(_ isBefore _)

/** Extract state from full Snowflake manifest state */
def getState(runIds: List[RunId]): SnowflakeState = {
val sortedRunIds = runIds.sortBy(_.addedAt).filterNot(_.toSkip) // TODO: DynamoDB query
val processed = sortedRunIds.collect { case x: ProcessedRunId => x } // next
val loaded = sortedRunIds.collect { case x: LoadedRunId => x } // done
SnowflakeState(processed, loaded)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import org.joda.time.DateTime
import cats.effect.{Clock, ExitCode, IO, Sync}
import cats.effect.concurrent.Ref

import fs2.Stream

import com.snowplowanalytics.snowflake.core.{Config, ProcessManifest, RunId}
import com.snowplowanalytics.snowflake.core.Config.S3Folder.{coerce => s3}
import com.snowplowanalytics.snowflake.loader.ast._
Expand Down Expand Up @@ -214,18 +216,17 @@ object LoaderSpec {
def markLoaded(tableName: String, runid: String): IO[Unit] =
state.update(s => s.copy(loaded = runid :: s.loaded))

def scan(tableName: String): IO[Either[String, List[RunId]]] =
IO(Right(
List(
RunId.ProcessedRunId(
"enriched/good/run=2017-12-10-14-30-35",
DateTime.parse("2017-12-10T01:20+02:00"),
DateTime.parse("2017-12-10T01:20+02:00"),
List("contexts_com_acme_something_1"),
Config.S3Folder.coerce("s3://archive/run=2017-12-10-14-30-35/"), "0.2.0", false))
))

def getUnprocessed(manifestTable: String, enrichedInput: Config.S3Folder): IO[Either[String, List[String]]] = ???
def scan(tableName: String): Stream[IO, RunId] =
Stream.emit(
RunId.ProcessedRunId(
"enriched/good/run=2017-12-10-14-30-35",
DateTime.parse("2017-12-10T01:20+02:00"),
DateTime.parse("2017-12-10T01:20+02:00"),
List("contexts_com_acme_something_1"),
Config.S3Folder.coerce("s3://archive/run=2017-12-10-14-30-35/"), "0.2.0", false)
).covary[IO]

def getUnprocessed(manifestTable: String, enrichedInput: Config.S3Folder): IO[List[String]] = ???
def add(tableName: String, runId: String): IO[Unit] = ???
def markProcessed(tableName: String, runId: String, shredTypes: List[String], outputPath: String): IO[Unit] = ???
}
Expand Down
Loading

0 comments on commit ea8f1cd

Please sign in to comment.