diff --git a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala index 894715cf7..613c510f9 100644 --- a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala +++ b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala @@ -12,58 +12,53 @@ */ package com.snowplowanalytics.snowplow.loader.databricks -import java.sql.Timestamp import cats.data.NonEmptyList -import io.circe.syntax._ -import doobie.Fragment -import doobie.implicits.javasql._ -import doobie.implicits._ import com.snowplowanalytics.iglu.core.SchemaKey import com.snowplowanalytics.iglu.schemaddl.migrations.{Migration, SchemaList} import com.snowplowanalytics.snowplow.rdbloader.LoadStatements import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} +import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns} import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity} -import com.snowplowanalytics.snowplow.rdbloader.db.{Statement, Target, AtomicColumns} +import com.snowplowanalytics.snowplow.rdbloader.db.{Manifest, Statement, Target} import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable -import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent +import doobie.Fragment +import doobie.implicits._ +import doobie.implicits.javasql._ +import io.circe.syntax._ + +import java.sql.Timestamp object Databricks { val AlertingTempTableName = "rdb_folder_monitoring" - val ManifestName = "manifest" + val UnstructPrefix = "unstruct_event_" + val ContextsPrefix = "contexts_" def build(config: Config[StorageTarget]): Either[String, Target] = { config.storage match { case tgt: StorageTarget.Databricks => val result = new Target { - def updateTable(migration: Migration): Block = - Block(Nil, Nil, Entity.Table(tgt.schema, SchemaKey(migration.vendor, migration.name, "jsonschema", migration.to))) - - def extendTable(info: ShreddedType.Info): Option[Block] = None + + override val requiresEventsColumns: Boolean = true - def getLoadStatements(discovery: DataDiscovery): LoadStatements = - NonEmptyList.one(Statement.EventsCopy(discovery.base, discovery.compression, getColumns(discovery))) + override def updateTable(migration: Migration): Block = + Block(Nil, Nil, Entity.Table(tgt.schema, SchemaKey(migration.vendor, migration.name, "jsonschema", migration.to))) - def getColumns(discovery: DataDiscovery): List[String] = { - val atomicColumns = AtomicColumns.Columns - val shredTypeColumns = discovery.shreddedTypes - .filterNot(_.isAtomic) - .map(getShredTypeColumn) - atomicColumns ::: shredTypeColumns - } + override def extendTable(info: ShreddedType.Info): Option[Block] = None - def getShredTypeColumn(shreddedType: ShreddedType): String = { - val shredProperty = shreddedType.getSnowplowEntity.toSdkProperty - val info = shreddedType.info - SnowplowEvent.transformSchema(shredProperty, info.vendor, info.name, info.model) + override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns): LoadStatements = { + val toCopy = ColumnsToCopy.fromDiscoveredData(discovery) + val toSkip = ColumnsToSkip(getEntityColumnsPresentInDbOnly(eventTableColumns, toCopy)) + + NonEmptyList.one(Statement.EventsCopy(discovery.base, discovery.compression, toCopy, toSkip)) } - def createTable(schemas: SchemaList): Block = Block(Nil, Nil, Entity.Table(tgt.schema, schemas.latest.schemaKey)) + override def createTable(schemas: SchemaList): Block = Block(Nil, Nil, Entity.Table(tgt.schema, schemas.latest.schemaKey)) - def getManifest: Statement = + override def getManifest: Statement = Statement.CreateTable( - Fragment.const0(s"""CREATE TABLE IF NOT EXISTS ${qualify(ManifestName)} ( + Fragment.const0(s"""CREATE TABLE IF NOT EXISTS ${qualify(Manifest.Name)} ( | base VARCHAR(512) NOT NULL, | types VARCHAR(65535) NOT NULL, | shredding_started TIMESTAMP NOT NULL, @@ -79,7 +74,7 @@ object Databricks { |""".stripMargin) ) - def toFragment(statement: Statement): Fragment = + override def toFragment(statement: Statement): Fragment = statement match { case Statement.Select1 => sql"SELECT 1" case Statement.ReadyCheck => sql"SELECT 1" @@ -93,7 +88,7 @@ object Databricks { sql"DROP TABLE IF EXISTS $frTableName" case Statement.FoldersMinusManifest => val frTableName = Fragment.const(qualify(AlertingTempTableName)) - val frManifest = Fragment.const(qualify(ManifestName)) + val frManifest = Fragment.const(qualify(Manifest.Name)) sql"SELECT run_id FROM $frTableName MINUS SELECT base FROM $frManifest" case Statement.FoldersCopy(source) => val frTableName = Fragment.const(qualify(AlertingTempTableName)) @@ -101,10 +96,15 @@ object Databricks { sql"""COPY INTO $frTableName FROM (SELECT _C0::VARCHAR(512) RUN_ID FROM '$frPath') FILEFORMAT = CSV"""; - case Statement.EventsCopy(path, _, columns) => - val frTableName = Fragment.const(qualify(EventsTable.MainName)) - val frPath = Fragment.const0(s"$path/output=good") - val frSelectColumns = Fragment.const0(columns.mkString(",") + ", current_timestamp() as load_tstamp") + case Statement.EventsCopy(path, _, toCopy, toSkip) => + val frTableName = Fragment.const(qualify(EventsTable.MainName)) + val frPath = Fragment.const0(path.append("output=good")) + val nonNulls = toCopy.names.map(_.value) + val nulls = toSkip.names.map(c => s"NULL AS ${c.value}") + val currentTimestamp = "current_timestamp() AS load_tstamp" + val allColumns = (nonNulls ::: nulls) :+ currentTimestamp + + val frSelectColumns = Fragment.const0(allColumns.mkString(",")) sql"""COPY INTO $frTableName FROM ( SELECT $frSelectColumns from '$frPath' @@ -123,12 +123,11 @@ object Databricks { throw new IllegalStateException("Databricks Loader does not support migrations") case _: Statement.RenameTable => throw new IllegalStateException("Databricks Loader does not support migrations") - case Statement.SetSearchPath => - throw new IllegalStateException("Databricks Loader does not support migrations") - case _: Statement.GetColumns => - throw new IllegalStateException("Databricks Loader does not support migrations") + case Statement.GetColumns(tableName) => + val qualifiedName = Fragment.const(qualify(tableName)) + sql"SHOW columns in $qualifiedName" case Statement.ManifestAdd(message) => - val tableName = Fragment.const(qualify(ManifestName)) + val tableName = Fragment.const(qualify(Manifest.Name)) val types = message.types.asJson.noSpaces sql"""INSERT INTO $tableName (base, types, shredding_started, shredding_completed, @@ -148,7 +147,7 @@ object Databricks { base, types, shredding_started, shredding_completed, min_collector_tstamp, max_collector_tstamp, compression, processor_artifact, processor_version, count_good - FROM ${Fragment.const0(qualify(ManifestName))} WHERE base = $base""" + FROM ${Fragment.const0(qualify(Manifest.Name))} WHERE base = $base""" case Statement.AddLoadTstampColumn => throw new IllegalStateException("Databricks Loader does not support load_tstamp column") case Statement.CreateTable(ddl) => @@ -162,14 +161,18 @@ object Databricks { throw new IllegalStateException("Databricks Loader does not support migrations") } - def qualify(tableName: String): String = + private def qualify(tableName: String): String = s"${tgt.catalog}.${tgt.schema}.$tableName" } Right(result) case other => Left(s"Invalid State: trying to build Databricks interpreter with unrecognized config (${other.driver} driver)") } - } + private def getEntityColumnsPresentInDbOnly(eventTableColumns: EventTableColumns, toCopy: ColumnsToCopy) = { + eventTableColumns + .filter(name => name.value.startsWith(UnstructPrefix) || name.value.startsWith(ContextsPrefix)) + .diff(toCopy.names) + } } diff --git a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala new file mode 100644 index 000000000..ace24f91b --- /dev/null +++ b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2012-2022 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.loader.databricks + +import cats.data.NonEmptyList +import com.snowplowanalytics.snowplow.rdbloader.common.S3 +import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} +import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression +import com.snowplowanalytics.snowplow.rdbloader.common.config.Region +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.SnowplowEntity +import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} +import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnName, ColumnsToCopy, ColumnsToSkip} +import com.snowplowanalytics.snowplow.rdbloader.db.{Statement, Target} + +import scala.concurrent.duration.DurationInt +import org.specs2.mutable.Specification + + +class DatabricksSpec extends Specification { + import DatabricksSpec._ + + "getLoadStatements" should { + + "create LoadStatements with columns to copy and columns to skip" in { + + val eventsColumns = List( + "unstruct_event_com_acme_aaa_1", + "unstruct_event_com_acme_bbb_1", + "contexts_com_acme_xxx_1", + "contexts_com_acme_yyy_1", + "not_a_snowplow_column" + ).map(ColumnName) + + val shreddedTypes = List( + ShreddedType.Widerow(ShreddedType.Info(baseFolder, "com_acme", "aaa", 1, SnowplowEntity.SelfDescribingEvent)), + ShreddedType.Widerow(ShreddedType.Info(baseFolder, "com_acme", "ccc", 1, SnowplowEntity.SelfDescribingEvent)), + ShreddedType.Widerow(ShreddedType.Info(baseFolder, "com_acme", "yyy", 1, SnowplowEntity.Context)), + ShreddedType.Widerow(ShreddedType.Info(baseFolder, "com_acme", "zzz", 1, SnowplowEntity.Context)) + ) + + val discovery = DataDiscovery(baseFolder, shreddedTypes, Compression.Gzip) + + target.getLoadStatements(discovery, eventsColumns) should be like { + case NonEmptyList(Statement.EventsCopy(path, compression, columnsToCopy, columnsToSkip), Nil) => + path must beEqualTo(baseFolder) + compression must beEqualTo(Compression.Gzip) + + columnsToCopy.names must contain(allOf( + ColumnName("unstruct_event_com_acme_aaa_1"), + ColumnName("unstruct_event_com_acme_ccc_1"), + ColumnName("contexts_com_acme_yyy_1"), + ColumnName("contexts_com_acme_zzz_1"), + )) + + columnsToCopy.names must not contain(ColumnName("unstruct_event_com_acme_bbb_1")) + columnsToCopy.names must not contain(ColumnName("contexts_com_acme_xxx_1")) + columnsToCopy.names must not contain(ColumnName("not_a_snowplow_column")) + + columnsToSkip.names must beEqualTo(List( + ColumnName("unstruct_event_com_acme_bbb_1"), + ColumnName("contexts_com_acme_xxx_1"), + )) + } + } + } + + "toFragment" should { + "create sql for loading" in { + val toCopy = ColumnsToCopy(List( + ColumnName("app_id"), + ColumnName("unstruct_event_com_acme_aaa_1"), + ColumnName("contexts_com_acme_xxx_1") + )) + val toSkip = ColumnsToSkip(List( + ColumnName("unstruct_event_com_acme_bbb_1"), + ColumnName("contexts_com_acme_yyy_1"), + )) + val statement = Statement.EventsCopy(baseFolder, Compression.Gzip, toCopy, toSkip) + + target.toFragment(statement).toString must beLike { case sql => + sql must contain("SELECT app_id,unstruct_event_com_acme_aaa_1,contexts_com_acme_xxx_1,NULL AS unstruct_event_com_acme_bbb_1,NULL AS contexts_com_acme_yyy_1,current_timestamp() AS load_tstamp from 's3://somewhere/path/output=good/'") + } + } + } +} + +object DatabricksSpec { + + val baseFolder: S3.Folder = + S3.Folder.coerce("s3://somewhere/path") + + val target: Target = Databricks.build(Config( + Region("eu-central-1"), + None, + Config.Monitoring(None, None, Config.Metrics(None, None, 1.minute), None, None, None), + "my-queue.fifo", + None, + StorageTarget.Databricks( + "host", + "hive_metastore", + "snowplow", + 443, + "some/path", + StorageTarget.PasswordConfig.PlainText("xxx"), + None, + "useragent" + ), + Config.Schedules(Nil), + Config.Timeouts(1.minute, 1.minute, 1.minute), + Config.Retries(Config.Strategy.Constant, None, 1.minute, None), + Config.Retries(Config.Strategy.Constant, None, 1.minute, None) + )).right.get + +} diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala index 2ab0e0acd..803f0ea32 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala @@ -18,6 +18,7 @@ import cats.implicits._ import cats.effect.{Clock, Concurrent, MonadThrow, Timer} import fs2.Stream import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} +import com.snowplowanalytics.snowplow.rdbloader.db.Columns._ import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, HealthCheck, Manifest, Statement, Control => DbControl} import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, NoOperation, Retries} import com.snowplowanalytics.snowplow.rdbloader.dsl.{AWS, Cache, DAO, FolderMonitoring, Iglu, Logging, Monitoring, StateMonitoring, Transaction} @@ -87,8 +88,8 @@ object Loader { * A primary loading processing, pulling information from discovery streams * (SQS and retry queue) and performing the load operation itself */ - def loadStream[F[_]: Transaction[*[_], C]: Concurrent: AWS: Iglu: Cache: Logging: Timer: Monitoring, - C[_]: DAO: MonadThrow: Logging](config: Config[StorageTarget], control: Control[F]): Stream[F, Unit] = { + private def loadStream[F[_]: Transaction[*[_], C]: Concurrent: AWS: Iglu: Cache: Logging: Timer: Monitoring, + C[_]: DAO: MonadThrow: Logging](config: Config[StorageTarget], control: Control[F]): Stream[F, Unit] = { val sqsDiscovery: DiscoveryStream[F] = DataDiscovery.discover[F](config, control.incrementMessages, control.isBusy) val retryDiscovery: DiscoveryStream[F] = @@ -105,9 +106,9 @@ object Loader { * over to `Load`. A primary function handling the global state - everything * downstream has access only to `F` actions, instead of whole `Control` object */ - def processDiscovery[F[_]: Transaction[*[_], C]: Concurrent: Iglu: Logging: Timer: Monitoring, - C[_]: DAO: MonadThrow: Logging](config: Config[StorageTarget], control: Control[F]) - (discovery: DataDiscovery.WithOrigin): F[Unit] = { + private def processDiscovery[F[_]: Transaction[*[_], C]: Concurrent: Iglu: Logging: Timer: Monitoring, + C[_]: DAO: MonadThrow: Logging](config: Config[StorageTarget], control: Control[F]) + (discovery: DataDiscovery.WithOrigin): F[Unit] = { val folder = discovery.origin.base val busy = (control.makeBusy: MakeBusy[F]).apply(folder) val backgroundCheck: F[Unit] => F[Unit] = @@ -142,15 +143,15 @@ object Loader { loading.handleErrorWith(reportLoadFailure[F](discovery, addFailure)) } - def addLoadTstampColumn[F[_]: DAO: Monad: Logging](targetConfig: StorageTarget): F[Unit] = + private def addLoadTstampColumn[F[_]: DAO: Monad: Logging](targetConfig: StorageTarget): F[Unit] = targetConfig match { // Adding load_tstamp column explicitly is not needed due to merge schema // feature of Databricks. It will create missing column itself. case _: StorageTarget.Databricks => Monad[F].unit case _ => for { - columns <- DbControl.getColumns[F](EventsTable.MainName) - _ <- if (columns.map(_.toLowerCase).contains(AtomicColumns.ColumnsWithDefault.LoadTstamp)) + allColumns <- DbControl.getColumns[F](EventsTable.MainName) + _ <- if (loadTstampColumnExist(allColumns)) Logging[F].info("load_tstamp column already exists") else DAO[F].executeUpdate(Statement.AddLoadTstampColumn, DAO.Purpose.NonLoading).void *> @@ -158,6 +159,12 @@ object Loader { } yield () } + private def loadTstampColumnExist(eventTableColumns: EventTableColumns) = { + eventTableColumns + .map(_.value.toLowerCase) + .contains(AtomicColumns.ColumnsWithDefault.LoadTstamp.value) + } + /** * Handle a failure during loading. * `Load.getTransaction` can fail only in one "expected" way - if the folder is already loaded @@ -168,9 +175,9 @@ object Loader { * @param discovery the original discovery * @param error the actual error, typically `SQLException` */ - def reportLoadFailure[F[_]: Logging: Monitoring: Monad](discovery: DataDiscovery.WithOrigin, - addFailure: Throwable => F[Boolean]) - (error: Throwable): F[Unit] = { + private def reportLoadFailure[F[_]: Logging: Monitoring: Monad](discovery: DataDiscovery.WithOrigin, + addFailure: Throwable => F[Boolean]) + (error: Throwable): F[Unit] = { val message = error match { case e: SQLException => s"${error.getMessage} - SqlState: ${e.getSQLState}" case _ => Option(error.getMessage).getOrElse(error.toString) @@ -184,7 +191,7 @@ object Loader { } /** Last level of failure handling, called when non-loading stream fail. Called on an application crash */ - def reportFatal[F[_]: Apply: Logging: Monitoring]: PartialFunction[Throwable, F[Unit]] = { + private def reportFatal[F[_]: Apply: Logging: Monitoring]: PartialFunction[Throwable, F[Unit]] = { case error => Logging[F].error("Loader shutting down") *> Monitoring[F].alert(Monitoring.AlertPayload.error(error.toString)) *> diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/AtomicColumns.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/AtomicColumns.scala index 8a91f6752..2e30509a2 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/AtomicColumns.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/AtomicColumns.scala @@ -1,12 +1,14 @@ package com.snowplowanalytics.snowplow.rdbloader.db +import com.snowplowanalytics.snowplow.rdbloader.db.Columns.ColumnName + object AtomicColumns { object ColumnsWithDefault { - val LoadTstamp = "load_tstamp" + val LoadTstamp = ColumnName("load_tstamp") } - val Columns: List[String] = List( + val Columns: List[ColumnName] = List( "app_id", "platform", "etl_tstamp", @@ -135,5 +137,5 @@ object AtomicColumns { "event_version", "event_fingerprint", "true_tstamp", - ) + ).map(ColumnName) } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Columns.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Columns.scala new file mode 100644 index 000000000..072f632d4 --- /dev/null +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Columns.scala @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2012-2021 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.rdbloader.db + +import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent +import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} + +object Columns { + + final case class ColumnName(value: String) extends AnyVal + type EventTableColumns = List[ColumnName] + + final case class ColumnsToCopy(names: List[ColumnName]) extends AnyVal + + object ColumnsToCopy { + def fromDiscoveredData(discovery: DataDiscovery): ColumnsToCopy = { + val shredTypeColumns = discovery.shreddedTypes + .filterNot(_.isAtomic) + .map(getShredTypeColumn) + ColumnsToCopy(AtomicColumns.Columns ::: shredTypeColumns) + } + + private def getShredTypeColumn(shreddedType: ShreddedType): ColumnName = { + val shredProperty = shreddedType.getSnowplowEntity.toSdkProperty + val info = shreddedType.info + ColumnName(SnowplowEvent.transformSchema(shredProperty, info.vendor, info.name, info.model)) + } + } + + final case class ColumnsToSkip(names: List[ColumnName]) extends AnyVal + + object ColumnsToSkip { + val none = ColumnsToSkip(List.empty) + } + +} diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Control.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Control.scala index cab8d4b42..f2964a84c 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Control.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Control.scala @@ -14,7 +14,7 @@ package com.snowplowanalytics.snowplow.rdbloader.db import cats.{Functor, Monad} import cats.implicits._ - +import com.snowplowanalytics.snowplow.rdbloader.db.Columns.ColumnName import com.snowplowanalytics.snowplow.rdbloader.dsl.DAO /** Set of common functions to control DB entities */ @@ -25,9 +25,7 @@ object Control { def tableExists[F[_]: DAO](tableName: String): F[Boolean] = DAO[F].executeQuery[Boolean](Statement.TableExists(tableName)) - def getColumns[F[_]: Monad: DAO](tableName: String): F[List[String]] = - for { - _ <- DAO[F].executeUpdate(Statement.SetSearchPath, DAO.Purpose.NonLoading) - columns <- DAO[F].executeQueryList[String](Statement.GetColumns(tableName)) - } yield columns + def getColumns[F[_]: Monad: DAO](tableName: String): F[List[ColumnName]] = + DAO[F].executeQueryList[String](Statement.GetColumns(tableName)) + .map(_.map(ColumnName)) } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Manifest.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Manifest.scala index 9cc5a8272..812b81a9b 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Manifest.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Manifest.scala @@ -10,6 +10,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.S3 import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage import com.snowplowanalytics.snowplow.rdbloader.config.StorageTarget import com.snowplowanalytics.snowplow.rdbloader.config.StorageTarget.{Databricks, Redshift} +import com.snowplowanalytics.snowplow.rdbloader.db.Columns.ColumnName import com.snowplowanalytics.snowplow.rdbloader.dsl.{AWS, DAO, Logging, Transaction} object Manifest { @@ -26,7 +27,7 @@ object Manifest { "commit_tstamp", "event_count", "shredded_cardinality" - ) + ).map(ColumnName) def initialize[F[_]: MonadThrow: Logging: Timer: AWS, C[_]: DAO: Monad]( target: StorageTarget @@ -51,8 +52,8 @@ object Manifest { for { exists <- Control.tableExists[F](Name) status <- if (exists) for { - columns <- Control.getColumns[F](Name) - legacy = columns.toSet === LegacyColumns.toSet + existingTableColumns <- Control.getColumns[F](Name) + legacy = existingTableColumns.toSet === LegacyColumns.toSet status <- if (legacy) Control.renameTable[F](Name, LegacyName) *> create[F].as[InitStatus](InitStatus.Migrated) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala index 6c0cc2a9a..ebf444fab 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala @@ -12,22 +12,18 @@ */ package com.snowplowanalytics.snowplow.rdbloader.db -import cats.{~>, Applicative, Monad, MonadThrow} import cats.data.EitherT import cats.implicits._ - -import doobie.Fragment - -import com.snowplowanalytics.iglu.core.{SchemaVer, SchemaMap, SchemaKey} - +import cats.{Applicative, Monad, MonadThrow, ~>} +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SchemaVer} import com.snowplowanalytics.iglu.schemaddl.StringUtils -import com.snowplowanalytics.iglu.schemaddl.migrations.SchemaList -import com.snowplowanalytics.iglu.schemaddl.migrations.{Migration => SchemaMigration} - -import com.snowplowanalytics.snowplow.rdbloader.{readSchemaKey, LoaderError, LoaderAction} +import com.snowplowanalytics.iglu.schemaddl.migrations.{SchemaList, Migration => SchemaMigration} +import com.snowplowanalytics.snowplow.rdbloader.db.Columns.ColumnName +import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, DiscoveryFailure, ShreddedType} +import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Iglu, Logging, Transaction} import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable -import com.snowplowanalytics.snowplow.rdbloader.discovery.{DiscoveryFailure, DataDiscovery, ShreddedType} -import com.snowplowanalytics.snowplow.rdbloader.dsl.{Logging, DAO, Transaction, Iglu} +import com.snowplowanalytics.snowplow.rdbloader.{LoaderAction, LoaderError, readSchemaKey} +import doobie.Fragment /** @@ -179,7 +175,7 @@ object Migration { schemaKey <- getVersion[F](tableName) matches = schemas.latest.schemaKey == schemaKey block <- if (matches) emptyBlock[F] - else Control.getColumns[F](tableName).flatMap { (columns: List[String]) => + else Control.getColumns[F](tableName).flatMap { (columns: List[ColumnName]) => migrateTable(target, schemaKey, columns, schemas) match { case Left(migrationError) => MonadThrow[F].raiseError[Option[Block]](migrationError) case Right(block) => MonadThrow[F].pure(block.some) @@ -195,7 +191,7 @@ object Migration { } } - def migrateTable(target: Target, current: SchemaKey, columns: List[String], schemaList: SchemaList): Either[LoaderError, Block] = { + def migrateTable(target: Target, current: SchemaKey, columns: List[ColumnName], schemaList: SchemaList): Either[LoaderError, Block] = { schemaList match { case s: SchemaList.Full => val migrations = s.extractSegments.map(SchemaMigration.fromSegment) @@ -207,7 +203,7 @@ object Migration { DiscoveryFailure.IgluError(message).toLoaderError.asLeft } case s: SchemaList.Single => - val message = s"Illegal State: updateTable called for a table with known single schema [${s.schema.self.schemaKey.toSchemaUri}]\ncolumns: ${columns.mkString(", ")}\nstate: $schemaList" + val message = s"Illegal State: updateTable called for a table with known single schema [${s.schema.self.schemaKey.toSchemaUri}]\ncolumns: ${columns.map(_.value).mkString(", ")}\nstate: $schemaList" LoaderError.MigrationError(message).asLeft } } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala index ad0b1036a..1a17048e6 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala @@ -13,10 +13,11 @@ package com.snowplowanalytics.snowplow.rdbloader.db import doobie.Fragment - -import com.snowplowanalytics.snowplow.rdbloader.common.{S3, LoaderMessage} +import com.snowplowanalytics.snowplow.rdbloader.common.{LoaderMessage, S3} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression +import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip} import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType +import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable /** @@ -53,8 +54,11 @@ object Statement { case class FoldersCopy(source: S3.Folder) extends Statement // Loading - case class EventsCopy(path: S3.Folder, compression: Compression, columns: List[String]) extends Statement with Loading { - def table: String = "events" + case class EventsCopy(path: S3.Folder, + compression: Compression, + columnsToCopy: ColumnsToCopy, + columnsToSkip: ColumnsToSkip) extends Statement with Loading { + def table: String = EventsTable.MainName } case class ShreddedCopy(shreddedType: ShreddedType, compression: Compression) extends Statement with Loading { def table: String = shreddedType.info.getName @@ -69,7 +73,6 @@ object Statement { case class GetVersion(tableName: String) extends Statement case class RenameTable(from: String, to: String) extends Statement - case object SetSearchPath extends Statement case class GetColumns(tableName: String) extends Statement case class CommentOn(tableName: String, comment: String) extends Statement case object AddLoadTstampColumn extends Statement diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala index 61abeea38..7b489ad33 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Target.scala @@ -12,13 +12,12 @@ */ package com.snowplowanalytics.snowplow.rdbloader.db -import doobie.Fragment - import com.snowplowanalytics.iglu.schemaddl.migrations.{SchemaList, Migration => SchemaMigration} - import com.snowplowanalytics.snowplow.rdbloader.LoadStatements +import com.snowplowanalytics.snowplow.rdbloader.db.Columns.EventTableColumns import com.snowplowanalytics.snowplow.rdbloader.db.Migration.Block import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} +import doobie.Fragment /** * Target represents all DB-specific logic and commands @@ -35,8 +34,11 @@ trait Target { * Transform `DataDiscovery` into `LoadStatements` * The statements could be either single statement (only `events` table) * or multi-statement (`events` plus shredded types) + * @param discovery TODO + * @param eventTableColumns TODO */ - def getLoadStatements(discovery: DataDiscovery): LoadStatements + def getLoadStatements(discovery: DataDiscovery, + eventTableColumns: EventTableColumns): LoadStatements /** Get DDL of a manifest table */ def getManifest: Statement @@ -49,4 +51,7 @@ trait Target { /** Add a new column into `events`, i.e. extend a wide row. Unlike `updateTable` it always operates on `events` table */ def extendTable(info: ShreddedType.Info): Option[Block] + + /** Whether the target needs to know existing columns in the events table */ + def requiresEventsColumns: Boolean } diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala index c0f34d933..b5415f980 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/loading/Load.scala @@ -23,7 +23,7 @@ import cats.effect.{Timer, Clock} import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage import com.snowplowanalytics.snowplow.rdbloader.common.S3 import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget } -import com.snowplowanalytics.snowplow.rdbloader.db.{ Migration, Manifest } +import com.snowplowanalytics.snowplow.rdbloader.db.{ Control, Migration, Manifest } import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery import com.snowplowanalytics.snowplow.rdbloader.dsl.{Iglu, Transaction, Logging, Monitoring, DAO} import com.snowplowanalytics.snowplow.rdbloader.dsl.metrics.Metrics @@ -131,7 +131,8 @@ object Load { discovery: DataDiscovery): F[Unit] = for { _ <- Logging[F].info(s"Loading ${discovery.base}") - _ <- DAO[F].target.getLoadStatements(discovery).traverse_ { statement => + existingEventTableColumns <- if (DAO[F].target.requiresEventsColumns) Control.getColumns[F](EventsTable.MainName) else Nil.pure[F] + _ <- DAO[F].target.getLoadStatements(discovery, existingEventTableColumns).traverse_ { statement => Logging[F].info(statement.title) *> setLoading(statement.table) *> DAO[F].executeUpdate(statement, DAO.Purpose.Loading).void diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala index a3708cbf2..2e0152b92 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala @@ -13,29 +13,23 @@ package com.snowplowanalytics.snowplow.rdbloader.loading import java.time.Instant - import scala.concurrent.duration.FiniteDuration - import cats.syntax.option._ - import cats.effect.Timer - -import com.snowplowanalytics.iglu.core.{SchemaVer, SchemaKey} - +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.snowplow.rdbloader.{LoaderError, SpecHelpers} -import com.snowplowanalytics.snowplow.rdbloader.common.{S3, LoaderMessage} -import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{Timestamps, Processor, TypesInfo} +import com.snowplowanalytics.snowplow.rdbloader.common.{LoaderMessage, S3} +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{Processor, Timestamps, TypesInfo} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression import com.snowplowanalytics.snowplow.rdbloader.common.config.Semver import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} -import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Transaction, Iglu, Logging} -import com.snowplowanalytics.snowplow.rdbloader.db.{Statement, Manifest} - +import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Iglu, Logging, Transaction} +import com.snowplowanalytics.snowplow.rdbloader.db.{Manifest, Statement} import org.specs2.mutable.Specification - import com.snowplowanalytics.snowplow.rdbloader.SpecHelpers._ +import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip} import com.snowplowanalytics.snowplow.rdbloader.test.TestState.LogEntry -import com.snowplowanalytics.snowplow.rdbloader.test.{PureDAO, Pure, PureOps, TestState, PureIglu, PureTransaction, PureLogging, PureTimer} +import com.snowplowanalytics.snowplow.rdbloader.test.{Pure, PureDAO, PureIglu, PureLogging, PureOps, PureTimer, PureTransaction, TestState} class LoadSpec extends Specification { import LoadSpec.{isBeforeFirstCommit, failCommit} @@ -58,7 +52,7 @@ class LoadSpec extends Specification { PureTransaction.StartMessage, LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)), - LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, List.empty)), + LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, ColumnsToCopy(List.empty), ColumnsToSkip(List.empty))), LogEntry.Sql(Statement.ShreddedCopy(info,Compression.Gzip)), LogEntry.Sql(Statement.ManifestAdd(LoadSpec.dataDiscoveryWithOrigin.origin.toManifestItem)), LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)), @@ -111,13 +105,13 @@ class LoadSpec extends Specification { PureTransaction.StartMessage, LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)), - LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, List.empty)), + LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, ColumnsToCopy(List.empty), ColumnsToSkip(List.empty))), LogEntry.Sql(Statement.ShreddedCopy(info,Compression.Gzip)), PureTransaction.RollbackMessage, LogEntry.Message("SLEEP 30000000000 nanoseconds"), PureTransaction.StartMessage, LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)), - LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, List.empty)), + LogEntry.Sql(Statement.EventsCopy("s3://shredded/base/".dir,Compression.Gzip, ColumnsToCopy(List.empty), ColumnsToSkip(List.empty))), LogEntry.Sql(Statement.ShreddedCopy(info,Compression.Gzip)), LogEntry.Sql(Statement.ManifestAdd(LoadSpec.dataDiscoveryWithOrigin.origin.toManifestItem)), LogEntry.Sql(Statement.ManifestGet("s3://shredded/base/".dir)), diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala index efecc85c1..0972a6480 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala @@ -14,19 +14,16 @@ package com.snowplowanalytics.snowplow.rdbloader.test import cats.data.NonEmptyList import cats.implicits._ - -import doobie.{Read, Fragment} - -import com.snowplowanalytics.iglu.core.{SchemaVer, SchemaKey} - +import doobie.{Fragment, Read} +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} import com.snowplowanalytics.iglu.schemaddl.StringUtils import com.snowplowanalytics.iglu.schemaddl.migrations.{FlatSchema, SchemaList, Migration => SchemaMigration} import com.snowplowanalytics.iglu.schemaddl.redshift.generators.DdlGenerator - -import com.snowplowanalytics.snowplow.rdbloader.{LoaderError, LoadStatements} +import com.snowplowanalytics.snowplow.rdbloader.{LoadStatements, LoaderError} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression -import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Item, Block} -import com.snowplowanalytics.snowplow.rdbloader.db.{Target, Statement, Migration} +import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns} +import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Item} +import com.snowplowanalytics.snowplow.rdbloader.db.{Migration, Statement, Target} import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import com.snowplowanalytics.snowplow.rdbloader.dsl.DAO @@ -93,9 +90,9 @@ object PureDAO { def toFragment(statement: Statement): Fragment = Fragment.const0(statement.toString) - def getLoadStatements(discovery: DataDiscovery): LoadStatements = + def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns): LoadStatements = NonEmptyList( - Statement.EventsCopy(discovery.base, Compression.Gzip, List.empty), + Statement.EventsCopy(discovery.base, Compression.Gzip, ColumnsToCopy(List.empty), ColumnsToSkip(List.empty)), discovery.shreddedTypes.map { shredded => Statement.ShreddedCopy(shredded, Compression.Gzip) } @@ -117,5 +114,7 @@ object PureDAO { val entity = Migration.Entity.Table("public", schemas.latest.schemaKey) Block(Nil, List(Item.CreateTable(Fragment.const0(createTable.toDdl))), entity) } + + def requiresEventsColumns: Boolean = false } } diff --git a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala index 434fb2af3..ec87137fd 100644 --- a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala +++ b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala @@ -12,31 +12,27 @@ */ package com.snowplowanalytics.snowplow.loader.redshift -import java.sql.Timestamp - import cats.data.NonEmptyList -import cats.implicits._ - -import io.circe.syntax._ -import doobie.Fragment -import doobie.implicits._ -import doobie.implicits.javasql._ - import com.snowplowanalytics.iglu.core.SchemaKey - import com.snowplowanalytics.iglu.schemaddl.StringUtils import com.snowplowanalytics.iglu.schemaddl.migrations.{FlatSchema, Migration, SchemaList} import com.snowplowanalytics.iglu.schemaddl.redshift._ import com.snowplowanalytics.iglu.schemaddl.redshift.generators.{DdlFile, DdlGenerator, MigrationGenerator} - import com.snowplowanalytics.snowplow.rdbloader.LoadStatements import com.snowplowanalytics.snowplow.rdbloader.common.Common import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} -import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Entity, Item, Block, NoPreStatements, NoStatements} -import com.snowplowanalytics.snowplow.rdbloader.db.{ Statement, Target, AtomicColumns } -import com.snowplowanalytics.snowplow.rdbloader.discovery.{ShreddedType, DataDiscovery} +import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns} +import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity, Item, NoPreStatements, NoStatements} +import com.snowplowanalytics.snowplow.rdbloader.db.{AtomicColumns, Manifest, Statement, Target} +import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable +import doobie.Fragment +import doobie.implicits._ +import doobie.implicits.javasql._ +import io.circe.syntax._ + +import java.sql.Timestamp object Redshift { @@ -48,7 +44,10 @@ object Redshift { config.storage match { case StorageTarget.Redshift(_, _, _, _, roleArn, schema, _, _, maxError, _) => val result = new Target { - def updateTable(migration: Migration): Block = { + + override val requiresEventsColumns: Boolean = false + + override def updateTable(migration: Migration): Block = { val ddlFile = MigrationGenerator.generateMigration(migration, 4096, Some(schema)) val (preTransaction, inTransaction) = ddlFile.statements.foldLeft((NoPreStatements, NoStatements)) { @@ -67,31 +66,30 @@ object Redshift { Block(preTransaction.reverse, inTransaction.reverse, Entity.Table(schema, target)) } - def extendTable(info: ShreddedType.Info): Option[Block] = + override def extendTable(info: ShreddedType.Info): Option[Block] = throw new IllegalStateException("Redshift Loader does not support loading wide row") - def getLoadStatements(discovery: DataDiscovery): LoadStatements = { + override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns): LoadStatements = { val shreddedStatements = discovery .shreddedTypes .filterNot(_.isAtomic) .map(shreddedType => Statement.ShreddedCopy(shreddedType, discovery.compression)) - // Since EventsCopy is used only for atomic events in Redshift Loader, - // 'columns' field of EventsCopy isn't needed therefore it is set to empty list. - val atomic = Statement.EventsCopy(discovery.base, discovery.compression, List.empty) + + val atomic = Statement.EventsCopy(discovery.base, discovery.compression, ColumnsToCopy(AtomicColumns.Columns), ColumnsToSkip.none) NonEmptyList(atomic, shreddedStatements) } - def createTable(schemas: SchemaList): Block = { + override def createTable(schemas: SchemaList): Block = { val subschemas = FlatSchema.extractProperties(schemas) val tableName = StringUtils.getTableName(schemas.latest) val createTable = DdlGenerator.generateTableDdl(subschemas, tableName, Some(schema), 4096, false) Block(Nil, List(Item.CreateTable(Fragment.const0(createTable.toDdl))), Entity.Table(schema, schemas.latest.schemaKey)) } - def getManifest: Statement = + override def getManifest: Statement = Statement.CreateTable(Fragment.const0(getManifestDef(schema).render)) - def toFragment(statement: Statement): Fragment = + override def toFragment(statement: Statement): Fragment = statement match { case Statement.Select1 => sql"SELECT 1" case Statement.ReadyCheck => sql"SELECT 1" @@ -111,7 +109,7 @@ object Redshift { val frRoleArn = Fragment.const0(s"aws_iam_role=$roleArn") val frPath = Fragment.const0(source) sql"COPY $frTableName FROM '$frPath' CREDENTIALS '$frRoleArn' DELIMITER '$EventFieldSeparator'" - case Statement.EventsCopy(path, compression, _) => + case Statement.EventsCopy(path, compression, columnsToCopy, _) => // For some reasons Redshift JDBC doesn't handle interpolation in COPY statements val frTableName = Fragment.const(EventsTable.withSchema(schema)) val frPath = Fragment.const0(Common.entityPathFull(path, Common.AtomicType)) @@ -119,7 +117,7 @@ object Redshift { val frRegion = Fragment.const0(config.region.name) val frMaxError = Fragment.const0(maxError.toString) val frCompression = getCompressionFormat(compression) - val frColumns = Fragment.const0(AtomicColumns.Columns.mkString(",")) + val frColumns = Fragment.const0(columnsToCopy.names.map(_.value).mkString(",")) sql"""COPY $frTableName ($frColumns) FROM '$frPath' | CREDENTIALS '$frRoleArn' @@ -191,13 +189,11 @@ object Redshift { val ddl = DdlFile(List(AlterTable(qualify(from), RenameTo(to)))) val str = ddl.render.split("\n").filterNot(l => l.startsWith("--") || l.isBlank).mkString("\n") Fragment.const0(str) - case Statement.SetSearchPath => - Fragment.const0(s"SET search_path TO ${schema}") case Statement.GetColumns(tableName) => - val fullName = qualify(tableName) - sql"""SELECT "column" FROM PG_TABLE_DEF WHERE tablename = $fullName AND schemaname = $schema""" + sql"""SELECT column_name FROM information_schema.columns + WHERE table_name = $tableName and table_schema = $schema""" case Statement.ManifestAdd(message) => - val tableName = Fragment.const(qualify(ManifestName)) + val tableName = Fragment.const(qualify(Manifest.Name)) val types = message.types.asJson.noSpaces sql"""INSERT INTO $tableName (base, types, shredding_started, shredding_completed, @@ -228,7 +224,7 @@ object Redshift { ddl } - def qualify(tableName: String): String = + private def qualify(tableName: String): String = s"$schema.$tableName" } @@ -244,8 +240,6 @@ object Redshift { case Compression.None => Fragment.empty } - val ManifestName = "manifest" - val ManifestColumns = List( Column("base", RedshiftVarchar(512), Set(CompressionEncoding(ZstdEncoding)),Set(Nullability(NotNull),KeyConstaint(PrimaryKey))), Column("types",RedshiftVarchar(65535),Set(CompressionEncoding(ZstdEncoding)),Set(Nullability(NotNull))), @@ -266,7 +260,7 @@ object Redshift { /** Add `schema` to otherwise static definition of manifest table */ def getManifestDef(schema: String): CreateTable = CreateTable( - s"$schema.$ManifestName", + s"$schema.${Manifest.Name}", ManifestColumns, Set.empty, Set(Diststyle(Key), DistKeyTable("base"), SortKeyTable(None,NonEmptyList.one("ingestion_tstamp"))) diff --git a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala index f59853598..68f075789 100644 --- a/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala +++ b/modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala @@ -12,32 +12,26 @@ */ package com.snowplowanalytics.snowplow.loader.snowflake -import java.sql.Timestamp - import cats.data.NonEmptyList - -import doobie.Fragment -import doobie.implicits._ -import doobie.implicits.javasql._ - -import io.circe.syntax._ - import com.snowplowanalytics.iglu.core.SchemaKey - import com.snowplowanalytics.iglu.schemaddl.migrations.{Migration, SchemaList} - +import com.snowplowanalytics.snowplow.loader.snowflake.ast.SnowflakeDatatype +import com.snowplowanalytics.snowplow.loader.snowflake.ast.Statements.AddColumn +import com.snowplowanalytics.snowplow.loader.snowflake.db.SnowflakeManifest import com.snowplowanalytics.snowplow.rdbloader.LoadStatements -import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.SnowplowEntity import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget} +import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns} +import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity, Item} +import com.snowplowanalytics.snowplow.rdbloader.db.{Manifest, Statement, Target} import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} -import com.snowplowanalytics.snowplow.rdbloader.db.{Target, Statement, AtomicColumns} -import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Item, Entity, Block} -import com.snowplowanalytics.snowplow.rdbloader.db.Manifest -import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.SnowplowEntity -import com.snowplowanalytics.snowplow.loader.snowflake.ast.SnowflakeDatatype -import com.snowplowanalytics.snowplow.loader.snowflake.db.SnowflakeManifest -import com.snowplowanalytics.snowplow.loader.snowflake.ast.Statements.AddColumn -import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent +import com.snowplowanalytics.snowplow.rdbloader.loading.EventsTable +import doobie.Fragment +import doobie.implicits._ +import doobie.implicits.javasql._ +import io.circe.syntax._ + +import java.sql.Timestamp object Snowflake { @@ -50,13 +44,16 @@ object Snowflake { config.storage match { case StorageTarget.Snowflake(_, _, _, _, _, warehouse, _, schema, stage, _, monitoringStage, onError, _) => val result = new Target { - def updateTable(migration: Migration): Block = { + + override val requiresEventsColumns: Boolean = false + + override def updateTable(migration: Migration): Block = { val target = SchemaKey(migration.vendor, migration.name, "jsonschema", migration.to) val entity = Entity.Table(schema, target) Block(Nil, Nil, entity) } - def extendTable(info: ShreddedType.Info): Option[Block] = { + override def extendTable(info: ShreddedType.Info): Option[Block] = { val isContext = info.entity == SnowplowEntity.Context val columnType = if (isContext) SnowflakeDatatype.JsonArray else SnowflakeDatatype.JsonObject val columnName = info.getNameFull @@ -65,60 +62,53 @@ object Snowflake { Some(Block(List(addColumn), Nil, Entity.Column(info))) } - def getLoadStatements(discovery: DataDiscovery): LoadStatements = - NonEmptyList(Statement.EventsCopy(discovery.base, discovery.compression, getColumns(discovery)), Nil) + override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns): LoadStatements = + NonEmptyList.one( + Statement.EventsCopy( + discovery.base, + discovery.compression, + ColumnsToCopy.fromDiscoveredData(discovery), + ColumnsToSkip.none + ) + ) // Technically, Snowflake Loader cannot create new tables - def createTable(schemas: SchemaList): Block = { + override def createTable(schemas: SchemaList): Block = { val entity = Entity.Table(schema, schemas.latest.schemaKey) Block(Nil, Nil, entity) } - def getManifest: Statement = + override def getManifest: Statement = Statement.CreateTable(SnowflakeManifest.getManifestDef(schema).toFragment) - def getColumns(discovery: DataDiscovery): List[String] = { - val atomicColumns = AtomicColumns.Columns - val shredTypeColumns = discovery.shreddedTypes - .filterNot(_.isAtomic) - .map(getShredTypeColumn) - atomicColumns ::: shredTypeColumns - } - - def getShredTypeColumn(shreddedType: ShreddedType): String = { - val shredProperty = shreddedType.getSnowplowEntity.toSdkProperty - val info = shreddedType.info - SnowplowEvent.transformSchema(shredProperty, info.vendor, info.name, info.model) - } - - def toFragment(statement: Statement): Fragment = + override def toFragment(statement: Statement): Fragment = statement match { case Statement.Select1 => sql"SELECT 1" // OK case Statement.ReadyCheck => sql"ALTER WAREHOUSE ${Fragment.const0(warehouse)} RESUME IF SUSPENDED" case Statement.CreateAlertingTempTable => // OK - val frTableName = Fragment.const(AlertingTempTableName) + val frTableName = Fragment.const(qualify(AlertingTempTableName)) sql"CREATE TEMPORARY TABLE $frTableName ( run_id VARCHAR )" case Statement.DropAlertingTempTable => - val frTableName = Fragment.const(AlertingTempTableName) + val frTableName = Fragment.const(qualify(AlertingTempTableName)) sql"DROP TABLE IF EXISTS $frTableName" case Statement.FoldersMinusManifest => - val frTableName = Fragment.const(AlertingTempTableName) - val frManifest = Fragment.const(s"${schema}.manifest") + val frTableName = Fragment.const(qualify(AlertingTempTableName)) + val frManifest = Fragment.const(qualify(Manifest.Name)) sql"SELECT run_id FROM $frTableName MINUS SELECT base FROM $frManifest" case Statement.FoldersCopy(source) => - val frTableName = Fragment.const(AlertingTempTableName) + val frTableName = Fragment.const(qualify(AlertingTempTableName)) // This is validated on config decoding stage val stageName = monitoringStage.getOrElse(throw new IllegalStateException("Folder Monitoring is launched without monitoring stage being provided")) val frPath = Fragment.const0(s"@$schema.$stageName/${source.folderName}") sql"COPY INTO $frTableName FROM $frPath FILE_FORMAT = (TYPE = CSV)" - case Statement.EventsCopy(path, _, columns) => { - def columnsForCopy: String = columns.mkString(",") + ",load_tstamp" - def columnsForSelect: String = columns.map(c => s"$$1:$c").mkString(",") + ",current_timestamp()" + case Statement.EventsCopy(path, _, columnsToCopy, _) => { + def columnsForCopy: String = columnsToCopy.names.map(_.value).mkString(",") + ",load_tstamp" + def columnsForSelect: String = columnsToCopy.names.map(c => s"$$1:${c.value}").mkString(",") + ",current_timestamp()" - val frPath = Fragment.const0(s"@$schema.$stage/${path.folderName}/output=good/") - val frCopy = Fragment.const0(s"$schema.${EventsTable.MainName}($columnsForCopy)") + val frPath = Fragment.const0(s"@${qualify(stage)}/${path.folderName}/output=good/") + val frCopy = Fragment.const0(s"${qualify(EventsTable.MainName)}($columnsForCopy)") val frSelectColumns = Fragment.const0(columnsForSelect) val frOnError = Fragment.const0(s"ON_ERROR = ${onError.asJson.noSpaces}") sql"""|COPY INTO $frCopy @@ -147,16 +137,14 @@ object Snowflake { throw new IllegalStateException("Snowflake Loader does not support table versioning") case Statement.RenameTable(from, to) => - Fragment.const0(s"ALTER TABLE $from RENAME TO $to") - case Statement.SetSearchPath => - Fragment.const0(s"USE SCHEMA ${schema}") + Fragment.const0(s"ALTER TABLE ${qualify(from)} RENAME TO ${qualify(to)}") case Statement.GetColumns(tableName) => val frSchemaName = Fragment.const0(schema.toUpperCase) val frTableName = Fragment.const0(tableName.toUpperCase) // Querying information_schema can be slow, but I couldn't find a way to select columns in 'show columns' query sql"SELECT column_name FROM information_schema.columns WHERE table_name = '$frTableName' AND table_schema = '$frSchemaName'" case Statement.ManifestAdd(message) => - val tableName = Fragment.const(s"${schema}.manifest") + val tableName = Fragment.const(qualify(Manifest.Name)) val types = Fragment.const0(s"parse_json('${message.types.asJson.noSpaces}')") // Redshift JDBC doesn't accept java.time.Instant sql"""INSERT INTO $tableName @@ -173,7 +161,7 @@ object Snowflake { base, types, shredding_started, shredding_completed, min_collector_tstamp, max_collector_tstamp, compression, processor_artifact, processor_version, count_good - FROM ${Fragment.const0(s"$schema.${Manifest.Name}")} WHERE base = $base""" + FROM ${Fragment.const0(qualify(Manifest.Name))} WHERE base = $base""" case Statement.AddLoadTstampColumn => sql"""ALTER TABLE ${Fragment.const0(EventsTable.withSchema(schema))} ADD COLUMN load_tstamp TIMESTAMP NULL""" @@ -181,7 +169,7 @@ object Snowflake { case Statement.CreateTable(ddl) => ddl case Statement.CommentOn(tableName, comment) => - val table = Fragment.const0(tableName) + val table = Fragment.const0(qualify(tableName)) val content = Fragment.const0(comment) sql"COMMENT IF EXISTS ON TABLE $table IS '$content'" case Statement.DdlFile(ddl) => @@ -189,7 +177,8 @@ object Snowflake { case Statement.AlterTable(ddl) => ddl } - def qualify(tableName: String): String = + + private def qualify(tableName: String): String = s"$schema.$tableName" }