Skip to content

Commit

Permalink
Loader: single COPY statement for each unique schema for Redshift (close
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Mar 8, 2023
1 parent 7f9b2eb commit 89f5e10
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ object Redshift {
): LoadStatements = {
val shreddedStatements = discovery.shreddedTypes
.filterNot(_.isAtomic)
.groupBy(_.getLoadPath)
.values
.map(_.head) // So we get only one copy statement for given path
.map(shreddedType => loadAuthMethod => Statement.ShreddedCopy(shreddedType, discovery.compression, loadAuthMethod))
.toList

val atomic = { loadAuthMethod: LoadAuthMethod =>
Statement.EventsCopy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
package com.snowplowanalytics.snowplow.loader.redshift

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}

import com.snowplowanalytics.iglu.schemaddl.migrations.{Migration => SchemaMigration, SchemaList}
import com.snowplowanalytics.iglu.schemaddl.redshift.{AddColumn, AlterTable, AlterType, CompressionEncoding, RedshiftVarchar, ZstdEncoding}

import com.snowplowanalytics.snowplow.rdbloader.db.{Migration, Target}

import org.specs2.mutable.Specification

import com.snowplowanalytics.snowplow.loader.redshift.db.MigrationSpec
import com.snowplowanalytics.snowplow.rdbloader.SpecHelpers.validConfig
import com.snowplowanalytics.snowplow.rdbloader.cloud.LoadAuthService
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.SnowplowEntity.{Context, SelfDescribingEvent}
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage.Folder
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType.{Info, Tabular}
import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery

class RedshiftSpec extends Specification {
import RedshiftSpec.redshift
Expand Down Expand Up @@ -67,6 +70,36 @@ class RedshiftSpec extends Specification {
case _ => ko("Unexpected block found")
}
}

"getLoadStatements should return one COPY per unique schema (vendor, name, model)" in {
val shreddedTypes = List(
Info(vendor = "com.acme", name = "event", model = 2, entity = SelfDescribingEvent, base = Folder.coerce("s3://my-bucket/my-path")),
Info(vendor = "com.acme", name = "event", model = 2, entity = Context, base = Folder.coerce("s3://my-bucket/my-path")),
Info(vendor = "com.acme", name = "event", model = 3, entity = SelfDescribingEvent, base = Folder.coerce("s3://my-bucket/my-path")),
Info(vendor = "com.acme", name = "event", model = 3, entity = Context, base = Folder.coerce("s3://my-bucket/my-path"))
).map(Tabular)

val discovery = DataDiscovery(
Folder.coerce("s3://my-bucket/my-path"),
shreddedTypes,
Compression.None,
TypesInfo.Shredded(List.empty),
Nil
)

val result = redshift
.getLoadStatements(discovery, List.empty, ())
.map(f => f(LoadAuthService.LoadAuthMethod.NoCreds).title)

result.size must beEqualTo(3)
result.toList must containTheSameElementsAs(
List(
"COPY events FROM s3://my-bucket/my-path/", // atomic
"COPY com_acme_event_2 FROM s3://my-bucket/my-path/output=good/vendor=com.acme/name=event/format=tsv/model=2",
"COPY com_acme_event_3 FROM s3://my-bucket/my-path/output=good/vendor=com.acme/name=event/format=tsv/model=3"
)
)
}
}
}

Expand Down

0 comments on commit 89f5e10

Please sign in to comment.