diff --git a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala index 69e9d818e..0ee1ce586 100644 --- a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala +++ b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala @@ -87,7 +87,11 @@ object Redshift { ): LoadStatements = { val shreddedStatements = discovery.shreddedTypes .filterNot(_.isAtomic) + .groupBy(_.getLoadPath) + .values + .map(_.head) // So we get only one copy statement for given path .map(shreddedType => loadAuthMethod => Statement.ShreddedCopy(shreddedType, discovery.compression, loadAuthMethod)) + .toList val atomic = { loadAuthMethod: LoadAuthMethod => Statement.EventsCopy( diff --git a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala index 20d22025d..21f05de86 100644 --- a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala +++ b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala @@ -13,16 +13,19 @@ package com.snowplowanalytics.snowplow.loader.redshift import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer} - import com.snowplowanalytics.iglu.schemaddl.migrations.{Migration => SchemaMigration, SchemaList} import com.snowplowanalytics.iglu.schemaddl.redshift.{AddColumn, AlterTable, AlterType, CompressionEncoding, RedshiftVarchar, ZstdEncoding} - import com.snowplowanalytics.snowplow.rdbloader.db.{Migration, Target} - import org.specs2.mutable.Specification - import com.snowplowanalytics.snowplow.loader.redshift.db.MigrationSpec import com.snowplowanalytics.snowplow.rdbloader.SpecHelpers.validConfig +import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.SnowplowEntity.{Context, SelfDescribingEvent} +import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo +import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage.Folder +import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression +import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType.{Info, Tabular} +import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery class RedshiftSpec extends Specification { import RedshiftSpec.redshift @@ -67,6 +70,36 @@ class RedshiftSpec extends Specification { case _ => ko("Unexpected block found") } } + + "getLoadStatements should return one COPY per unique schema (vendor, name, model)" in { + val shreddedTypes = List( + Info(vendor = "com.acme", name = "event", model = 2, entity = SelfDescribingEvent, base = Folder.coerce("s3://my-bucket/my-path")), + Info(vendor = "com.acme", name = "event", model = 2, entity = Context, base = Folder.coerce("s3://my-bucket/my-path")), + Info(vendor = "com.acme", name = "event", model = 3, entity = SelfDescribingEvent, base = Folder.coerce("s3://my-bucket/my-path")), + Info(vendor = "com.acme", name = "event", model = 3, entity = Context, base = Folder.coerce("s3://my-bucket/my-path")) + ).map(Tabular) + + val discovery = DataDiscovery( + Folder.coerce("s3://my-bucket/my-path"), + shreddedTypes, + Compression.None, + TypesInfo.Shredded(List.empty), + Nil + ) + + val result = redshift + .getLoadStatements(discovery, List.empty, ()) + .map(f => f(LoadAuthService.LoadAuthMethod.NoCreds).title) + + result.size must beEqualTo(3) + result.toList must containTheSameElementsAs( + List( + "COPY events FROM s3://my-bucket/my-path/", // atomic + "COPY com_acme_event_2 FROM s3://my-bucket/my-path/output=good/vendor=com.acme/name=event/format=tsv/model=2", + "COPY com_acme_event_3 FROM s3://my-bucket/my-path/output=good/vendor=com.acme/name=event/format=tsv/model=3" + ) + ) + } } }