From 8642218753bc6bcc85969a9f6cdd43dabbbc0748 Mon Sep 17 00:00:00 2001 From: spenes Date: Fri, 10 Nov 2023 14:00:04 +0300 Subject: [PATCH] enrich-pubsub: set UserAgent header in Pubsub publisher and consumer (close #826) --- build.sbt | 6 +- .../enrich/common/fs2/config/ConfigFile.scala | 2 +- .../common/fs2/config/ParsedConfigs.scala | 2 +- .../enrich/common/fs2/config/io.scala | 27 ++- .../common/fs2/config/ConfigFileSpec.scala | 81 ++----- .../common/fs2/config/ParsedConfigsSpec.scala | 45 +++- .../src/main/resources/application.conf | 20 +- .../snowplow/enrich/pubsub/Sink.scala | 3 +- .../snowplow/enrich/pubsub/Source.scala | 5 +- .../snowplow/enrich/pubsub/Utils.scala | 24 +++ .../snowplow/enrich/pubsub/ConfigSpec.scala | 199 ++++++++++++++++++ .../enrich/pubsub/GcpUserAgentSpec.scala | 39 ++++ project/BuildSettings.scala | 2 +- project/Dependencies.scala | 4 +- 14 files changed, 368 insertions(+), 91 deletions(-) create mode 100644 modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Utils.scala create mode 100644 modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/ConfigSpec.scala create mode 100644 modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/GcpUserAgentSpec.scala diff --git a/build.sbt b/build.sbt index baa5db1b3..6e1bd9a1c 100644 --- a/build.sbt +++ b/build.sbt @@ -23,7 +23,7 @@ lazy val root = project.in(file(".")) .settings(projectSettings) .settings(compilerSettings) .settings(resolverSettings) - .aggregate(common, commonFs2, pubsub, pubsubDistroless, kinesis, kinesisDistroless, streamCommon, streamKinesis, streamKinesisDistroless, streamKafka, streamKafkaDistroless, streamNsq, streamNsqDistroless, streamStdin, kafka, kafkaDistroless, rabbitmq, rabbitmqDistroless, nsq, nsqDistroless) + .aggregate(common, commonFs2, pubsub, kinesis, streamCommon, streamKinesis, streamKafka, streamNsq, streamStdin, kafka, rabbitmq, nsq) lazy val common = project .in(file("modules/common")) @@ -115,7 +115,7 @@ lazy val pubsub = project .settings(libraryDependencies ++= pubsubDependencies) .settings(excludeDependencies ++= exclusions) .settings(addCompilerPlugin(betterMonadicFor)) - .dependsOn(commonFs2) + .dependsOn(commonFs2 % "test->test;compile->compile") lazy val pubsubDistroless = project .in(file("modules/distroless/pubsub")) @@ -125,7 +125,7 @@ lazy val pubsubDistroless = project .settings(libraryDependencies ++= pubsubDependencies) .settings(excludeDependencies ++= exclusions) .settings(addCompilerPlugin(betterMonadicFor)) - .dependsOn(commonFs2) + .dependsOn(commonFs2 % "test->test;compile->compile") lazy val kinesis = project diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala index ec4d71136..d050e3ed3 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala @@ -67,7 +67,7 @@ object ConfigFile { case c @ ConfigFile(_, Outputs(good, Some(output: Output.Kinesis), bad), _, _, _, _, _, _, _, _) if output.streamName.isEmpty => c.copy(output = Outputs(good, None, bad)).asRight // Remove pii output if topic empty - case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if t.isEmpty => + case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if t.isEmpty => c.copy(output = Outputs(good, None, bad)).asRight // Remove pii output if topic empty case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _, _) if topicName.isEmpty => diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala index 8c7409786..1173703f2 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala @@ -147,7 +147,7 @@ object ParsedConfigs { private[config] def outputAttributes(output: OutputConfig): EnrichedEvent => Map[String, String] = output match { - case OutputConfig.PubSub(_, Some(attributes), _, _, _) => attributesFromFields(attributes) + case OutputConfig.PubSub(_, Some(attributes), _, _, _, _) => attributesFromFields(attributes) case OutputConfig.Kafka(_, _, _, headers, _) => attributesFromFields(headers) case _ => _ => Map.empty } diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala index 53b89971b..eb07870a9 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala @@ -126,7 +126,8 @@ object io { parallelPullCount: Int, maxQueueSize: Int, maxRequestBytes: Int, - maxAckExtensionPeriod: FiniteDuration + maxAckExtensionPeriod: FiniteDuration, + gcpUserAgent: GcpUserAgent ) extends Input { val (project, name) = subscription.split("/").toList match { @@ -224,7 +225,7 @@ object io { implicit val inputDecoder: Decoder[Input] = deriveConfiguredDecoder[Input] .emap { - case s @ PubSub(sub, _, _, _, _) => + case s @ PubSub(sub, _, _, _, _, _) => sub.split("/").toList match { case List("projects", _, "subscriptions", _) => s.asRight @@ -236,13 +237,13 @@ object io { case other => other.asRight } .emap { - case PubSub(_, p, _, _, _) if p <= 0 => + case PubSub(_, p, _, _, _, _) if p <= 0 => "PubSub parallelPullCount must be > 0".asLeft - case PubSub(_, _, m, _, _) if m <= 0 => + case PubSub(_, _, m, _, _, _) if m <= 0 => "PubSub maxQueueSize must be > 0".asLeft - case PubSub(_, _, _, m, _) if m <= 0 => + case PubSub(_, _, _, m, _, _) if m <= 0 => "PubSub maxRequestBytes must be > 0".asLeft - case PubSub(_, _, _, _, m) if m < Duration.Zero => + case PubSub(_, _, _, _, m, _) if m < Duration.Zero => "PubSub maxAckExtensionPeriod must be >= 0".asLeft case other => other.asRight @@ -285,7 +286,8 @@ object io { attributes: Option[Set[String]], delayThreshold: FiniteDuration, maxBatchSize: Long, - maxBatchBytes: Long + maxBatchBytes: Long, + gcpUserAgent: GcpUserAgent ) extends Output { val (project, name) = topic.split("/").toList match { @@ -320,7 +322,7 @@ object io { .emap { case Kafka(topicName, bootstrapServers, _, _, _) if topicName.isEmpty ^ bootstrapServers.isEmpty => "Both topicName and bootstrapServers have to be set".asLeft - case s @ PubSub(top, _, _, _, _) if top.nonEmpty => + case s @ PubSub(top, _, _, _, _, _) if top.nonEmpty => top.split("/").toList match { case List("projects", _, "topics", _) => s.asRight @@ -513,6 +515,15 @@ object io { ) } + case class GcpUserAgent(productName: String) + + object GcpUserAgent { + implicit val gcpUserAgentDecoder: Decoder[GcpUserAgent] = + deriveConfiguredDecoder[GcpUserAgent] + implicit val gcpUserAgentEncoder: Encoder[GcpUserAgent] = + deriveConfiguredEncoder[GcpUserAgent] + } + object AdaptersSchemasEncoderDecoders { implicit val adaptersSchemasDecoder: Decoder[AdaptersSchemas] = deriveConfiguredDecoder[AdaptersSchemas] diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala index 4f7b1cbc0..32fddd28d 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala @@ -35,67 +35,6 @@ import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas class ConfigFileSpec extends Specification with CatsIO { "parse" should { - "parse reference example for PubSub" in { - val configPath = Paths.get(getClass.getResource("/config.pubsub.extended.hocon").toURI) - val expected = ConfigFile( - io.Input.PubSub("projects/test-project/subscriptions/collector-payloads-sub", 1, 3000, 50000000, 1.hour), - io.Outputs( - io.Output.PubSub("projects/test-project/topics/enriched", Some(Set("app_id")), 200.milliseconds, 1000, 8000000), - Some(io.Output.PubSub("projects/test-project/topics/pii", None, 200.milliseconds, 1000, 8000000)), - io.Output.PubSub("projects/test-project/topics/bad", None, 200.milliseconds, 1000, 8000000) - ), - io.Concurrency(256, 3), - Some(7.days), - io.RemoteAdapterConfigs( - 10.seconds, - 45.seconds, - 10, - List( - io.RemoteAdapterConfig("com.example", "v1", "https://remote-adapter.com") - ) - ), - io.Monitoring( - Some(Sentry(URI.create("http://sentry.acme.com"))), - io.MetricsReporters( - Some(io.MetricsReporters.StatsD("localhost", 8125, Map("app" -> "enrich"), 10.seconds, None)), - Some(io.MetricsReporters.Stdout(10.seconds, None)), - true - ) - ), - io.Telemetry( - false, - 15.minutes, - "POST", - "collector-g.snowplowanalytics.com", - 443, - true, - Some("my_pipeline"), - Some("hfy67e5ydhtrd"), - Some("665bhft5u6udjf"), - Some("enrich-kinesis-ce"), - Some("1.0.0") - ), - io.FeatureFlags( - false, - false, - false - ), - Some( - io.Experimental( - Some( - io.Metadata( - Uri.uri("https://my_pipeline.my_domain.com/iglu"), - 5.minutes, - UUID.fromString("c5f3a09f-75f8-4309-bec5-fea560f78455"), - UUID.fromString("75a13583-5c99-40e3-81fc-541084dfc784") - ) - ) - ) - ), - adaptersSchemas - ) - ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) - } "parse reference example for Kinesis" in { val configPath = Paths.get(getClass.getResource("/config.kinesis.extended.hocon").toURI) @@ -422,7 +361,10 @@ class ConfigFileSpec extends Specification with CatsIO { "parallelPullCount": 1, "maxQueueSize": 3000, "maxRequestBytes": 50000000, - "maxAckExtensionPeriod": 1 hour + "maxAckExtensionPeriod": 1 hour, + "gcpUserAgent": { + "productName": "Snowplow OSS" + } }, "output": { "good": { @@ -430,21 +372,30 @@ class ConfigFileSpec extends Specification with CatsIO { "topic": "projects/test-project/topics/good-topic", "delayThreshold": "200 milliseconds", "maxBatchSize": 1000, - "maxBatchBytes": 8000000 + "maxBatchBytes": 8000000, + "gcpUserAgent": { + "productName": "Snowplow OSS" + } }, "pii": { "type": "PubSub", "topic": "projects/test-project/topics/pii-topic", "delayThreshold": "200 milliseconds", "maxBatchSize": 1000, - "maxBatchBytes": 8000000 + "maxBatchBytes": 8000000, + "gcpUserAgent": { + "productName": "Snowplow OSS" + } }, "bad": { "type": "PubSub", "topic": "projects/test-project/topics/bad-topic", "delayThreshold": "200 milliseconds", "maxBatchSize": 1000, - "maxBatchBytes": 8000000 + "maxBatchBytes": 8000000, + "gcpUserAgent": { + "productName": "Snowplow OSS" + } } }, "concurrency": { diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala index 66ceeba07..b42684f20 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala @@ -37,13 +37,41 @@ class ParsedConfigsSpec extends Specification with CatsIO { val invalidAttr2 = "invalidAttr2" val configFile = ConfigFile( - io.Input.PubSub("projects/test-project/subscriptions/inputSub", 1, 3000, 50000000, 1.hour), + io.Input.PubSub( + "projects/test-project/subscriptions/inputSub", + 1, + 3000, + 50000000, + 1.hour, + io.GcpUserAgent("Snowplow OSS") + ), io.Outputs( - io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id", invalidAttr1)), 200.milliseconds, 1000, 10000000), + io.Output.PubSub( + "projects/test-project/topics/good-topic", + Some(Set("app_id", invalidAttr1)), + 200.milliseconds, + 1000, + 10000000, + io.GcpUserAgent("Snowplow OSS") + ), Some( - io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", invalidAttr2)), 200.milliseconds, 1000, 10000000) + io.Output.PubSub( + "projects/test-project/topics/pii-topic", + Some(Set("app_id", invalidAttr2)), + 200.milliseconds, + 1000, + 10000000, + io.GcpUserAgent("Snowplow OSS") + ) ), - io.Output.PubSub("projects/test-project/topics/bad-topic", None, 200.milliseconds, 1000, 10000000) + io.Output.PubSub( + "projects/test-project/topics/bad-topic", + None, + 200.milliseconds, + 1000, + 10000000, + io.GcpUserAgent("Snowplow OSS") + ) ), io.Concurrency(10000, 64), Some(7.days), @@ -100,7 +128,14 @@ class ParsedConfigsSpec extends Specification with CatsIO { "outputAttributes" should { "fetch attribute values" in { - val output = io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")), 200.milliseconds, 1000, 10000000) + val output = io.Output.PubSub( + "projects/test-project/topics/good-topic", + Some(Set("app_id")), + 200.milliseconds, + 1000, + 10000000, + io.GcpUserAgent("Snowplow OSS") + ) val ee = new EnrichedEvent() ee.app_id = "test_app" diff --git a/modules/pubsub/src/main/resources/application.conf b/modules/pubsub/src/main/resources/application.conf index 0d4f18ce7..c8e13272f 100644 --- a/modules/pubsub/src/main/resources/application.conf +++ b/modules/pubsub/src/main/resources/application.conf @@ -4,7 +4,10 @@ "parallelPullCount": 1 "maxQueueSize": 3000 "maxRequestBytes": 50000000 - "maxAckExtensionPeriod": 1 hour + "maxAckExtensionPeriod": 1 hour, + "gcpUserAgent": { + "productName": "Snowplow OSS" + } } "output": { @@ -12,7 +15,10 @@ "type": "PubSub" "delayThreshold": 200 milliseconds "maxBatchSize": 1000 - "maxBatchBytes": 8000000 + "maxBatchBytes": 8000000, + "gcpUserAgent": { + "productName": "Snowplow OSS" + } } "pii": { @@ -21,14 +27,20 @@ "topic": "" "delayThreshold": 200 milliseconds "maxBatchSize": 1000 - "maxBatchBytes": 8000000 + "maxBatchBytes": 8000000, + "gcpUserAgent": { + "productName": "Snowplow OSS" + } } "bad": { "type": "PubSub" "delayThreshold": 200 milliseconds "maxBatchSize": 1000 - "maxBatchBytes": 8000000 + "maxBatchBytes": 8000000, + "gcpUserAgent": { + "productName": "Snowplow OSS" + } } } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala index c24ec93a7..0478f25d5 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala @@ -61,7 +61,8 @@ object Sink { batchSize = output.maxBatchSize, requestByteThreshold = Some(output.maxBatchBytes), delayThreshold = output.delayThreshold, - onFailedTerminate = err => Logger[F].error(err)("PubSub sink termination error") + onFailedTerminate = err => Logger[F].error(err)("PubSub sink termination error"), + customizePublisher = Some(_.setHeaderProvider(Utils.createPubsubUserAgentHeader(output.gcpUserAgent))) ) GooglePubsubProducer diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Source.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Source.scala index 2df576d06..cc3e89d20 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Source.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Source.scala @@ -62,7 +62,10 @@ object Source { parallelPullCount = input.parallelPullCount, maxQueueSize = input.maxQueueSize, maxAckExtensionPeriod = input.maxAckExtensionPeriod, - customizeSubscriber = Some(builder => builder.setFlowControlSettings(flowControlSettings)) + customizeSubscriber = Some { + _.setFlowControlSettings(flowControlSettings) + .setHeaderProvider(Utils.createPubsubUserAgentHeader(input.gcpUserAgent)) + } ) val projectId = Model.ProjectId(input.project) diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Utils.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Utils.scala new file mode 100644 index 000000000..0950446a7 --- /dev/null +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Utils.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2019-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.pubsub + +import com.google.api.gax.rpc.FixedHeaderProvider + +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.GcpUserAgent + +object Utils { + + def createPubsubUserAgentHeader(gcpUserAgent: GcpUserAgent): FixedHeaderProvider = + FixedHeaderProvider.create("user-agent", s"${gcpUserAgent.productName}/enrich (GPN:Snowplow;)") + +} diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/ConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/ConfigSpec.scala new file mode 100644 index 000000000..2aac06fb6 --- /dev/null +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/ConfigSpec.scala @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2019-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.pubsub + +import java.net.URI +import java.util.UUID +import java.nio.file.Paths + +import scala.concurrent.duration._ + +import cats.syntax.either._ +import cats.effect.IO + +import org.http4s.Uri + +import cats.effect.testing.specs2.CatsIO + +import com.snowplowanalytics.snowplow.enrich.common.fs2.config._ + +import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas + +import org.specs2.mutable.Specification + +class ConfigSpec extends Specification with CatsIO { + + "parse" should { + "parse reference example for PubSub" in { + val configPath = Paths.get(getClass.getResource("/config.pubsub.extended.hocon").toURI) + val expected = ConfigFile( + io.Input.PubSub( + "projects/test-project/subscriptions/collector-payloads-sub", + 1, + 3000, + 50000000, + 1.hour, + io.GcpUserAgent("Snowplow OSS") + ), + io.Outputs( + io.Output.PubSub( + "projects/test-project/topics/enriched", + Some(Set("app_id")), + 200.milliseconds, + 1000, + 8000000, + io.GcpUserAgent("Snowplow OSS") + ), + Some( + io.Output.PubSub( + "projects/test-project/topics/pii", + None, + 200.milliseconds, + 1000, + 8000000, + io.GcpUserAgent("Snowplow OSS") + ) + ), + io.Output.PubSub( + "projects/test-project/topics/bad", + None, + 200.milliseconds, + 1000, + 8000000, + io.GcpUserAgent("Snowplow OSS") + ) + ), + io.Concurrency(256, 3), + Some(7.days), + io.RemoteAdapterConfigs( + 10.seconds, + 45.seconds, + 10, + List( + io.RemoteAdapterConfig("com.example", "v1", "https://remote-adapter.com") + ) + ), + io.Monitoring( + Some(Sentry(URI.create("http://sentry.acme.com"))), + io.MetricsReporters( + Some(io.MetricsReporters.StatsD("localhost", 8125, Map("app" -> "enrich"), 10.seconds, None)), + Some(io.MetricsReporters.Stdout(10.seconds, None)), + false + ) + ), + io.Telemetry( + false, + 15.minutes, + "POST", + "collector-g.snowplowanalytics.com", + 443, + true, + Some("my_pipeline"), + Some("hfy67e5ydhtrd"), + Some("665bhft5u6udjf"), + Some("enrich-kinesis-ce"), + Some("1.0.0") + ), + io.FeatureFlags( + false, + false, + false + ), + Some( + io.Experimental( + Some( + io.Metadata( + Uri.uri("https://my_pipeline.my_domain.com/iglu"), + 5.minutes, + UUID.fromString("c5f3a09f-75f8-4309-bec5-fea560f78455"), + UUID.fromString("75a13583-5c99-40e3-81fc-541084dfc784") + ) + ) + ) + ), + adaptersSchemas + ) + ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) + } + + "parse minimal example for PubSub" in { + val configPath = Paths.get(getClass.getResource("/config.pubsub.minimal.hocon").toURI) + val expected = ConfigFile( + io.Input.PubSub( + "projects/test-project/subscriptions/collector-payloads-sub", + 1, + 3000, + 50000000, + 1.hour, + io.GcpUserAgent("Snowplow OSS") + ), + io.Outputs( + io.Output.PubSub( + "projects/test-project/topics/enriched", + None, + 200.milliseconds, + 1000, + 8000000, + io.GcpUserAgent("Snowplow OSS") + ), + None, + io.Output.PubSub( + "projects/test-project/topics/bad", + None, + 200.milliseconds, + 1000, + 8000000, + io.GcpUserAgent("Snowplow OSS") + ) + ), + io.Concurrency(256, 3), + None, + io.RemoteAdapterConfigs( + 10.seconds, + 45.seconds, + 10, + List() + ), + io.Monitoring( + None, + io.MetricsReporters( + None, + None, + false + ) + ), + io.Telemetry( + false, + 15.minutes, + "POST", + "collector-g.snowplowanalytics.com", + 443, + true, + None, + None, + None, + None, + None + ), + io.FeatureFlags( + false, + false, + false + ), + None, + adaptersSchemas + ) + ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) + } + } +} diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/GcpUserAgentSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/GcpUserAgentSpec.scala new file mode 100644 index 000000000..bce36f7b9 --- /dev/null +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/enrich/pubsub/GcpUserAgentSpec.scala @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2019-2023 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.pubsub + +import java.util.regex.Pattern +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.GcpUserAgent +import org.specs2.mutable.Specification + +class GcpUserAgentSpec extends Specification { + + "createUserAgent" should { + "create user agent string correctly" in { + val gcpUserAgent = GcpUserAgent(productName = "Snowplow OSS") + val resultUserAgent = Utils.createPubsubUserAgentHeader(gcpUserAgent).getHeaders.get("user-agent") + val expectedUserAgent = s"Snowplow OSS/enrich (GPN:Snowplow;)" + + val userAgentRegex = Pattern.compile( + """(?iU)(?:[^\(\)\/]+\/[^\/]+\s+)*(?:[^\s][^\(\)\/]+\/[^\/]+\s?\([^\(\)]*)gpn:(.*)[;\)]""" + ) + val matcher = userAgentRegex.matcher(resultUserAgent) + val matched = if (matcher.find()) Some(matcher.group(1)) else None + val expectedMatched = "Snowplow;" + + resultUserAgent must beEqualTo(expectedUserAgent) + matched must beSome(expectedMatched) + } + } + +} diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 3a54d1a6c..1b8d8102f 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -303,7 +303,7 @@ object BuildSettings { assemblySettings ++ dockerSettingsFocal ++ Seq(Docker / packageName := "snowplow-enrich-pubsub") ++ // Tests - scoverageSettings ++ noParallelTestExecution + scoverageSettings ++ noParallelTestExecution ++ addExampleConfToTestCp } lazy val pubsubDistrolessBuildSettings = pubsubBuildSettings.diff(dockerSettingsFocal) ++ dockerSettingsDistroless diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1351d5838..ac340944b 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -338,7 +338,9 @@ object Dependencies { val pubsubDependencies = Seq( fs2BlobGcs, gcs, - fs2PubSub + fs2PubSub, + specs2, + specs2CE ) val kinesisDependencies = Seq(