diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d2035f9c1..778f999be 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -84,7 +84,8 @@ jobs: "project streamKafka; set assembly / test := {}; assembly" \ "project streamNsq; set assembly / test := {}; assembly" \ "project pubsub; set assembly / test := {}; assembly" \ - "project kinesis; set assembly / test := {}; assembly" + "project kinesis; set assembly / test := {}; assembly" \ + "project rabbitmq; set assembly / test := {}; assembly" - name: Create GitHub release and attach artifacts uses: softprops/action-gh-release@v1 with: @@ -98,6 +99,7 @@ jobs: modules/stream/nsq/target/scala-2.12/snowplow-stream-enrich-nsq-${{ steps.ver.outputs.tag }}.jar modules/pubsub/target/scala-2.12/snowplow-enrich-pubsub-${{ steps.ver.outputs.tag }}.jar modules/kinesis/target/scala-2.12/snowplow-enrich-kinesis-${{ steps.ver.outputs.tag }}.jar + modules/rabbitmq/target/scala-2.12/snowplow-enrich-kinesis-${{ steps.ver.outputs.tag }}.jar env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} @@ -113,6 +115,11 @@ jobs: - streamNsq - pubsub - kinesis + - rabbitmq + include: + - suffix: "" + - suffix: -experimental + app: rabbitmq steps: - uses: actions/checkout@v2 if: startsWith(github.ref, 'refs/tags/') @@ -144,7 +151,7 @@ jobs: - name: Get app package name id: packageName run: | - export PACKAGE_NAME=$(sbt "project ${{ matrix.app }}" dockerAlias -Dsbt.log.noformat=true | sed -n '/\[info\]/ s/\[info\] //p' | tail -1 | tr -d '\n' | cut -d":" -f1) + export PACKAGE_NAME=$(sbt "project ${{ matrix.app }}" dockerAlias -Dsbt.log.noformat=true | sed -n '/\[info\]/ s/\[info\] //p' | tail -1 | tr -d '\n' | cut -d":" -f1)${{ matrix.suffix }} echo "::set-output name=package_name::$PACKAGE_NAME" - name: Get app base directory id: baseDirectory diff --git a/build.sbt b/build.sbt index 2ed411c6b..ba3e8806f 100644 --- a/build.sbt +++ b/build.sbt @@ -23,7 +23,7 @@ lazy val root = project.in(file(".")) .settings(projectSettings) .settings(compilerSettings) .settings(resolverSettings) - .aggregate(common, commonFs2, pubsub, pubsubDistroless, kinesis, kinesisDistroless, streamCommon, streamKinesis, streamKinesisDistroless, streamKafka, streamKafkaDistroless, streamNsq, streamNsqDistroless, streamStdin) + .aggregate(common, commonFs2, pubsub, pubsubDistroless, kinesis, kinesisDistroless, streamCommon, streamKinesis, streamKinesisDistroless, streamKafka, streamKafkaDistroless, streamNsq, streamNsqDistroless, streamStdin, rabbitmq, rabbitmqDistroless) lazy val common = project .in(file("modules/common")) @@ -155,3 +155,22 @@ lazy val bench = project .in(file("modules/bench")) .dependsOn(pubsub % "test->test") .enablePlugins(JmhPlugin) + +lazy val rabbitmq = project + .in(file("modules/rabbitmq")) + .enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin) + .settings(rabbitmqBuildSettings) + .settings(libraryDependencies ++= rabbitmqDependencies) + .settings(excludeDependencies ++= exclusions) + .settings(addCompilerPlugin(betterMonadicFor)) + .dependsOn(commonFs2) + +lazy val rabbitmqDistroless = project + .in(file("modules/distroless/rabbitmq")) + .enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin, LauncherJarPlugin) + .settings(sourceDirectory := (rabbitmq / sourceDirectory).value) + .settings(rabbitmqDistrolessBuildSettings) + .settings(libraryDependencies ++= rabbitmqDependencies) + .settings(excludeDependencies ++= exclusions) + .settings(addCompilerPlugin(betterMonadicFor)) + .dependsOn(commonFs2) diff --git a/config/config.rabbitmq.extended.hocon b/config/config.rabbitmq.extended.hocon new file mode 100644 index 000000000..0f824019b --- /dev/null +++ b/config/config.rabbitmq.extended.hocon @@ -0,0 +1,281 @@ +{ + # Where to read collector payloads from + "input": { + "type": "RabbitMQ" + + "cluster": { + # Nodes of RabbitMQ cluster + "nodes": [ + { + "host": "localhost" + "port": 5672 + } + ] + # Username to connect to the cluster + "username": "guest" + # Password to connect to the cluster + "password": "guest" + # Virtual host to use when connecting to the cluster + "virtualHost": "/" + + # Optional. Whether to use SSL or not to communicate with the cluster + "ssl": false + # Optional. Timeout for the connection to the cluster (in seconds) + "connectionTimeout": 5 + # Optional. Size of the fs2’s bounded queue used internally to communicate with the AMQP Java driver + "internalQueueSize": 1000 + # Optional. Whether the AMQP Java driver should try to recover broken connections + "automaticRecovery": true + # Optional. Interval to check that the TCP connection to the cluster is still alive + "requestedHeartbeat": 100 + } + + # Queue to read collector payloads from + "queue": "raw" + + # Optional. Settings for backoff policy for checkpointing. + # Records are checkpointed after all the records of the same chunk have been enriched + "checkpointBackoff": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + "maxRetries": 10 + } + } + + "output": { + # Enriched events output + "good": { + "type": "RabbitMQ" + + "cluster": { + # Nodes of RabbitMQ cluster + "nodes": [ + { + "host": "localhost" + "port": 5672 + } + ] + # Username to connect to the cluster + "username": "guest" + # Password to connect to the cluster + "password": "guest" + # Virtual host to use when connecting to the cluster + "virtualHost": "/" + + # Optional. Whether to use SSL or not to communicate with the cluster + "ssl": false + # Optional. Timeout for the connection to the cluster (in seconds) + "connectionTimeout": 5 + # Optional. Size of the fs2’s bounded queue used internally to communicate with the AMQP Java driver + "internalQueueSize": 1000 + # Optional. Whether the AMQP Java driver should try to recover broken connections + "automaticRecovery": true + # Optional. Interval to check that the TCP connection to the cluster is still alive + "requestedHeartbeat": 100 + } + + # Exchange to send the enriched events to + "exchange": "enriched" + # Routing key to use when sending the enriched events to the exchange + "routingKey": "enriched" + + # Optional. Policy to retry if writing to RabbitMQ fails + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + "maxRetries": 10 + } + } + + # Bad rows output + "bad": { + "type": "RabbitMQ" + + "cluster": { + # Nodes of RabbitMQ cluster + "nodes": [ + { + "host": "localhost" + "port": 5672 + } + ] + # Username to connect to the cluster + "username": "guest" + # Password to connect to the cluster + "password": "guest" + # Virtual host to use when connecting to the cluster + "virtualHost": "/" + + # Optional. Whether to use SSL or not to communicate with the cluster + "ssl": false + # Optional. Timeout for the connection to the cluster (in seconds) + "connectionTimeout": 5 + # Optional. Size of the fs2’s bounded queue used internally to communicate with the AMQP Java driver + "internalQueueSize": 1000 + # Optional. Whether the AMQP Java driver should try to recover broken connections + "automaticRecovery": true + # Optional. Interval to check that the TCP connection to the cluster is still alive + "requestedHeartbeat": 100 + } + + # Exchange to send the bad rows to + "exchange": "bad-1" + # Routing key to use when sending the bad rows to the exchange + "routingKey": "bad-1" + + # Optional. Policy to retry if writing to RabbitMQ fails + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + "maxRetries": 10 + } + } + } + + # Optional. Concurrency of the app + "concurrency" : { + # Number of events that can get enriched at the same time within a chunk + "enrich": 256 + # Number of chunks that can get sunk at the same time + "sink": 3 + } + + # Optional, period after which enrich assets should be checked for updates + # no assets will be updated if the key is absent + "assetsUpdatePeriod": "7 days" + + # Optional, configuration of remote adapters + "remoteAdapters": { + # how long enrich waits to establish a connection to remote adapters + "connectionTimeout": "10 seconds" + # how long enrich waits to get a response from remote adapters + "readTimeout": "45 seconds" + # how many connections enrich opens at maximum for remote adapters + # increasing this could help with throughput in case of adapters with high latency + "maxConnections": 10 + # a list of remote adapter configs + "configs": [ + { + "vendor": "com.example" + "version": "v1" + "url": "https://remote-adapter.com" + } + ] + } + + "monitoring": { + + # Optional, for tracking runtime exceptions + "sentry": { + "dsn": "http://sentry.acme.com" + } + + # Optional, configure how metrics are reported + "metrics": { + + # Optional. Send metrics to a StatsD server on localhost + "statsd": { + "hostname": "localhost" + "port": 8125 + + # Required, how frequently to report metrics + "period": "10 seconds" + + # Any key-value pairs to be tagged on every StatsD metric + "tags": { + "app": enrich + } + + # Optional, override the default metric prefix + # "prefix": "snowplow.enrich." + } + + # Optional. Log to stdout using Slf4j + "stdout": { + "period": "10 seconds" + + # Optional, override the default metric prefix + # "prefix": "snowplow.enrich." + } + + # Optional. Send KCL and KPL metrics to Cloudwatch + "cloudwatch": true + } + } + + # Optional, configure telemetry + # All the fields are optional + "telemetry": { + + # Set to true to disable telemetry + "disable": false + + # Interval for the heartbeat event + "interval": 15 minutes + + # HTTP method used to send the heartbeat event + "method": POST + + # URI of the collector receiving the heartbeat event + "collectorUri": collector-g.snowplowanalytics.com + + # Port of the collector receiving the heartbeat event + "collectorPort": 443 + + # Whether to use https or not + "secure": true + + # Identifier intended to tie events together across modules, + # infrastructure and apps when used consistently + "userProvidedId": my_pipeline + + # ID automatically generated upon running a modules deployment script + # Intended to identify each independent module, and the infrastructure it controls + "autoGeneratedId": hfy67e5ydhtrd + + # Unique identifier for the VM instance + # Unique for each instance of the app running within a module + "instanceId": 665bhft5u6udjf + + # Name of the terraform module that deployed the app + "moduleName": enrich-rabbitmq-ce + + # Version of the terraform module that deployed the app + "moduleVersion": 1.0.0 + } + + # Optional. To activate/deactive enrich features that are still in beta + # or that are here for transition. + # This section might change in future versions + "featureFlags" : { + + # Enrich 3.0.0 introduces the validation of the enriched events against atomic schema + # before emitting. + # If set to false, a bad row will be emitted instead of the enriched event + # if validation fails. + # If set to true, invalid enriched events will be emitted, as before. + # WARNING: this feature flag will be removed in a future version + # and it will become impossible to emit invalid enriched events. + # More details: https://github.com/snowplow/enrich/issues/517#issuecomment-1033910690 + "acceptInvalid": false + + # In early versions of enrich-kinesis and enrich-pubsub (pre-3.1.4), the Javascript enrichment + # incorrectly ran before the currency, weather, and IP Lookups enrichments. Set this flag to true + # to keep the erroneous behaviour of those previous versions. This flag will be removed in a + # future version. + # More details: https://github.com/snowplow/enrich/issues/619 + "legacyEnrichmentOrder": false + } + + # Optional. Configuration for experimental/preview features + "experimental": { + # Whether to export metadata using a webhook URL. + # Follows iglu-webhook protocol. + "metadata": { + "endpoint": "https://my_pipeline.my_domain.com/iglu" + "interval": 5 minutes + "organizationId": "c5f3a09f-75f8-4309-bec5-fea560f78455" + "pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784" + } + } +} diff --git a/config/config.rabbitmq.minimal.hocon b/config/config.rabbitmq.minimal.hocon new file mode 100644 index 000000000..35eb287b9 --- /dev/null +++ b/config/config.rabbitmq.minimal.hocon @@ -0,0 +1,48 @@ +{ + "input": { + "cluster": { + "nodes": [ + { + "host": "localhost" + "port": 5672 + } + ] + "username": "guest" + "password": "guest" + "virtualHost": "/" + } + "queue": "raw" + } + + "output": { + "good": { + "cluster": { + "nodes": [ + { + "host": "localhost" + "port": 5672 + } + ] + "username": "guest" + "password": "guest" + "virtualHost": "/" + } + "exchange": "enriched" + } + + "bad": { + "cluster": { + "nodes": [ + { + "host": "localhost" + "port": 5672 + } + ] + "username": "guest" + "password": "guest" + "virtualHost": "/" + } + "exchange": "bad-1" + } + } +} diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala index af65572b7..08828bf0d 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala @@ -18,6 +18,7 @@ import java.net.URI import java.util.UUID import cats.syntax.either._ +import cats.data.NonEmptyList import scala.concurrent.duration.{Duration, FiniteDuration} @@ -59,19 +60,47 @@ object io { import ConfigFile.finiteDurationEncoder - case class CheckpointBackoff( + case class BackoffPolicy( minBackoff: FiniteDuration, maxBackoff: FiniteDuration, maxRetries: Int ) - implicit val checkpointBackoffDecoder: Decoder[CheckpointBackoff] = deriveConfiguredDecoder[CheckpointBackoff] - implicit val checkpointBackoffEncoder: Encoder[CheckpointBackoff] = deriveConfiguredEncoder[CheckpointBackoff] + object BackoffPolicy { + implicit def backoffPolicyDecoder: Decoder[BackoffPolicy] = + deriveConfiguredDecoder[BackoffPolicy] + implicit def backoffPolicyEncoder: Encoder[BackoffPolicy] = + deriveConfiguredEncoder[BackoffPolicy] + } sealed trait RetryCheckpointing { - val checkpointBackoff: CheckpointBackoff + val checkpointBackoff: BackoffPolicy + } + + case class RabbitMQNode( + host: String, + port: Int + ) + object RabbitMQNode { + implicit val rabbitMQNodeDecoder: Decoder[RabbitMQNode] = deriveConfiguredDecoder[RabbitMQNode] + implicit val rabbitMQNodeEncoder: Encoder[RabbitMQNode] = deriveConfiguredEncoder[RabbitMQNode] + } + + case class RabbitMQConfig( + nodes: NonEmptyList[RabbitMQNode], + username: String, + password: String, + virtualHost: String, + connectionTimeout: Int, + ssl: Boolean, + internalQueueSize: Int, + requestedHeartbeat: Int, + automaticRecovery: Boolean + ) + object RabbitMQConfig { + implicit val rabbitMQConfigDecoder: Decoder[RabbitMQConfig] = deriveConfiguredDecoder[RabbitMQConfig] + implicit val rabbitMQConfigEncoder: Encoder[RabbitMQConfig] = deriveConfiguredEncoder[RabbitMQConfig] } - /** Source of raw collector data (only PubSub supported atm) */ sealed trait Input object Input { @@ -99,12 +128,18 @@ object io { initialPosition: Kinesis.InitPosition, retrievalMode: Kinesis.Retrieval, bufferSize: Int, - checkpointBackoff: CheckpointBackoff, + checkpointBackoff: BackoffPolicy, customEndpoint: Option[URI], dynamodbCustomEndpoint: Option[URI], cloudwatchCustomEndpoint: Option[URI] ) extends Input with RetryCheckpointing + case class RabbitMQ( + cluster: RabbitMQConfig, + queue: String, + checkpointBackoff: BackoffPolicy + ) extends Input + with RetryCheckpointing object Kinesis { sealed trait InitPosition @@ -238,18 +273,12 @@ object io { byteLimit: Int, customEndpoint: Option[URI] ) extends Output - - case class BackoffPolicy( - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - maxRetries: Int - ) - object BackoffPolicy { - implicit def backoffPolicyDecoder: Decoder[BackoffPolicy] = - deriveConfiguredDecoder[BackoffPolicy] - implicit def backoffPolicyEncoder: Encoder[BackoffPolicy] = - deriveConfiguredEncoder[BackoffPolicy] - } + case class RabbitMQ( + cluster: RabbitMQConfig, + exchange: String, + routingKey: String, + backoffPolicy: BackoffPolicy + ) extends Output implicit val outputDecoder: Decoder[Output] = deriveConfiguredDecoder[Output] diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala index c7fe9895f..9c6c9ab49 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFileSpec.scala @@ -19,6 +19,7 @@ import java.nio.file.Paths import scala.concurrent.duration._ import cats.syntax.either._ +import cats.data.NonEmptyList import cats.effect.IO @@ -102,7 +103,7 @@ class ConfigFileSpec extends Specification with CatsIO { io.Input.Kinesis.InitPosition.TrimHorizon, io.Input.Kinesis.Retrieval.Polling(10000), 3, - io.CheckpointBackoff(100.milli, 10.second, 10), + io.BackoffPolicy(100.milli, 10.second, 10), None, None, None @@ -112,7 +113,7 @@ class ConfigFileSpec extends Specification with CatsIO { "enriched", Some("eu-central-1"), None, - io.Output.BackoffPolicy(100.millis, 10.seconds, 10), + io.BackoffPolicy(100.millis, 10.seconds, 10), 500, 5242880, None @@ -122,7 +123,7 @@ class ConfigFileSpec extends Specification with CatsIO { "pii", Some("eu-central-1"), None, - io.Output.BackoffPolicy(100.millis, 10.seconds, 10), + io.BackoffPolicy(100.millis, 10.seconds, 10), 500, 5242880, None @@ -132,7 +133,7 @@ class ConfigFileSpec extends Specification with CatsIO { "bad", Some("eu-central-1"), None, - io.Output.BackoffPolicy(100.millis, 10.seconds, 10), + io.BackoffPolicy(100.millis, 10.seconds, 10), 500, 5242880, None @@ -189,6 +190,116 @@ class ConfigFileSpec extends Specification with CatsIO { ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) } + "parse reference example for RabbitMQ" in { + val configPath = Paths.get(getClass.getResource("/config.rabbitmq.extended.hocon").toURI) + val expected = ConfigFile( + io.Input.RabbitMQ( + io.RabbitMQConfig( + NonEmptyList.one( + io.RabbitMQNode("localhost", 5672) + ), + "guest", + "guest", + "/", + 5, + false, + 1000, + 100, + true + ), + "raw", + io.BackoffPolicy(100.millis, 10.seconds, 10) + ), + io.Outputs( + io.Output.RabbitMQ( + io.RabbitMQConfig( + NonEmptyList.one( + io.RabbitMQNode("localhost", 5672) + ), + "guest", + "guest", + "/", + 5, + false, + 1000, + 100, + true + ), + "enriched", + "enriched", + io.BackoffPolicy(100.millis, 10.seconds, 10) + ), + None, + io.Output.RabbitMQ( + io.RabbitMQConfig( + NonEmptyList.one( + io.RabbitMQNode("localhost", 5672) + ), + "guest", + "guest", + "/", + 5, + false, + 1000, + 100, + true + ), + "bad-1", + "bad-1", + io.BackoffPolicy(100.millis, 10.seconds, 10) + ) + ), + io.Concurrency(256, 3), + Some(7.days), + io.RemoteAdapterConfigs( + 10.seconds, + 45.seconds, + 10, + List( + io.RemoteAdapterConfig("com.example", "v1", "https://remote-adapter.com") + ) + ), + io.Monitoring( + Some(Sentry(URI.create("http://sentry.acme.com"))), + io.MetricsReporters( + Some(io.MetricsReporters.StatsD("localhost", 8125, Map("app" -> "enrich"), 10.seconds, None)), + Some(io.MetricsReporters.Stdout(10.seconds, None)), + true + ) + ), + io.Telemetry( + false, + 15.minutes, + "POST", + "collector-g.snowplowanalytics.com", + 443, + true, + Some("my_pipeline"), + Some("hfy67e5ydhtrd"), + Some("665bhft5u6udjf"), + Some("enrich-rabbitmq-ce"), + Some("1.0.0") + ), + io.FeatureFlags( + false, + false + ), + Some( + io.Experimental( + Some( + io.Metadata( + Uri.uri("https://my_pipeline.my_domain.com/iglu"), + 5.minutes, + UUID.fromString("c5f3a09f-75f8-4309-bec5-fea560f78455"), + UUID.fromString("75a13583-5c99-40e3-81fc-541084dfc784") + ) + ) + ) + ) + ) + ConfigFile.parse[IO](configPath.asRight).value.map(result => result must beRight(expected)) + } + "parse valid 0 minutes as None" in { val input = """{ diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Sink.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Sink.scala index 6c4f40381..da0cfc4ef 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Sink.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Sink.scala @@ -35,7 +35,7 @@ import com.amazonaws.services.kinesis.model._ import com.amazonaws.services.kinesis.{AmazonKinesis, AmazonKinesisClientBuilder} import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink} -import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Output} object Sink { @@ -97,7 +97,7 @@ object Sink { } } yield exists - private def getRetryPolicy[F[_]: Applicative](config: Output.BackoffPolicy): RetryPolicy[F] = + private def getRetryPolicy[F[_]: Applicative](config: BackoffPolicy): RetryPolicy[F] = capDelay[F](config.maxBackoff, fullJitter[F](config.minBackoff)) .join(limitRetries(config.maxRetries)) diff --git a/modules/rabbitmq/src/main/resources/application.conf b/modules/rabbitmq/src/main/resources/application.conf new file mode 100644 index 000000000..656f14173 --- /dev/null +++ b/modules/rabbitmq/src/main/resources/application.conf @@ -0,0 +1,85 @@ +{ + "input": { + "type": "RabbitMQ" + "cluster": { + "ssl": false + "connectionTimeout": 5 + "internalQueueSize": 1000 + "automaticRecovery": true + "requestedHeartbeat": 100 + } + "checkpointBackoff": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + "maxRetries": 10 + } + } + + "output": { + "good": { + "type": "RabbitMQ" + "cluster": { + "ssl": false + "connectionTimeout": 5 + "internalQueueSize": 1000 + "automaticRecovery": true + "requestedHeartbeat": 100 + } + "routingKey": "enriched" + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + "maxRetries": 10 + } + } + + "bad": { + "type": "RabbitMQ" + "cluster": { + "ssl": false + "connectionTimeout": 5 + "internalQueueSize": 1000 + "automaticRecovery": true + "requestedHeartbeat": 100 + } + "routingKey": "bad-1" + "backoffPolicy": { + "minBackoff": 100 milliseconds + "maxBackoff": 10 seconds + "maxRetries": 10 + } + } + } + + "concurrency" : { + "enrich": 256 + "sink": 3 + } + + "remoteAdapters" : { + "connectionTimeout": 10 seconds + "readTimeout": 45 seconds + "maxConnections": 10 + "configs" : [] + } + + "monitoring": { + "metrics": { + "cloudwatch": false + } + } + + "telemetry": { + "disable": false + "interval": 15 minutes + "method": POST + "collectorUri": collector-g.snowplowanalytics.com + "collectorPort": 443 + "secure": true + } + + "featureFlags" : { + "acceptInvalid": false + "legacyEnrichmentOrder": false + } +} diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala new file mode 100644 index 000000000..3ab4cffb2 --- /dev/null +++ b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Main.scala @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2022-2022 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.rabbitmq + +import cats.Parallel +import cats.implicits._ + +import cats.effect.{ExitCode, IO, IOApp, Resource, Sync, SyncIO} + +import java.util.concurrent.{Executors, TimeUnit} + +import scala.concurrent.ExecutionContext + +import com.snowplowanalytics.snowplow.enrich.common.fs2.Run +import com.snowplowanalytics.snowplow.enrich.common.fs2.Telemetry + +import com.snowplowanalytics.snowplow.enrich.rabbitmq.generated.BuildInfo + +object Main extends IOApp.WithContext { + + private val MaxRecordSize = 128000000 + + /** + * An execution context matching the cats effect IOApp default. We create it explicitly so we can + * also use it for our Blaze client. + */ + override protected val executionContextResource: Resource[SyncIO, ExecutionContext] = { + val poolSize = math.max(2, Runtime.getRuntime().availableProcessors()) + Resource + .make(SyncIO(Executors.newFixedThreadPool(poolSize)))(pool => + SyncIO { + pool.shutdown() + pool.awaitTermination(10, TimeUnit.SECONDS) + () + } + ) + .map(ExecutionContext.fromExecutorService) + } + + def run(args: List[String]): IO[ExitCode] = + Run.run[IO, Record[IO]]( + args, + BuildInfo.name, + """(\d.\d.\d(-\w*\d*)?)""".r.findFirstIn(BuildInfo.version).getOrElse(BuildInfo.version), + BuildInfo.description, + executionContext, + (_, cliConfig) => IO(cliConfig), + (blocker, input, _) => Source.init(blocker, input), + (blocker, out) => Sink.initAttributed(blocker, out), + (blocker, out) => Sink.initAttributed(blocker, out), + (blocker, out) => Sink.init(blocker, out), + checkpoint, + Nil, + _.data, + MaxRecordSize, + Some(Telemetry.Cloud.Gcp), + None + ) + + private def checkpoint[F[_]: Parallel: Sync](records: List[Record[F]]): F[Unit] = + records.parTraverse_(_.ack) +} diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Record.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Record.scala new file mode 100644 index 000000000..302d60850 --- /dev/null +++ b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Record.scala @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2022-2022 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.rabbitmq + +final case class Record[F[_]](data: Array[Byte], ack: F[Unit]) diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala new file mode 100644 index 000000000..3ef6be6af --- /dev/null +++ b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2022-2022 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.rabbitmq + +import cats.implicits._ +import cats.{Applicative, Parallel} + +import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Resource, Sync, Timer} + +import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig +import dev.profunktor.fs2rabbit.model._ + +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import retry.syntax.all._ +import retry.RetryPolicies._ +import retry.RetryPolicy + +import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.{BackoffPolicy, Output} + +object Sink { + + private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = + Slf4jLogger.getLogger[F] + + def init[F[_]: ConcurrentEffect: ContextShift: Parallel: Sync: Timer]( + blocker: Blocker, + output: Output + ): Resource[F, ByteSink[F]] = + for { + sink <- initAttributed(blocker, output) + } yield records => sink(records.map(AttributedData(_, Map.empty))) + + def initAttributed[F[_]: ConcurrentEffect: ContextShift: Parallel: Sync: Timer]( + blocker: Blocker, + output: Output + ): Resource[F, AttributedByteSink[F]] = + output match { + case o: Output.RabbitMQ => + val mapped = mapConfig(o.cluster) + initSink[F](blocker, o, mapped) + case o => + Resource.eval(Sync[F].raiseError(new IllegalArgumentException(s"Output $o is not RabbitMQ"))) + } + + private def initSink[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer]( + blocker: Blocker, + rawConfig: Output.RabbitMQ, + config: Fs2RabbitConfig + ): Resource[F, AttributedByteSink[F]] = + for { + client <- Resource.eval(createClient[F](blocker, config)) + channel <- client.createConnectionChannel + publisher <- Resource.eval { + implicit val ch = channel + val exchangeName = ExchangeName(rawConfig.exchange) + client.declareExchangePassive(exchangeName) *> + client.createPublisher[String](exchangeName, RoutingKey(rawConfig.routingKey)) + } + sink = (records: List[AttributedData[Array[Byte]]]) => + records + .map(_.data) + .parTraverse_ { bytes => + publisher(new String(bytes)) + .retryingOnAllErrors( + policy = getRetryPolicy[F](rawConfig.backoffPolicy), + onError = (exception, retryDetails) => + Logger[F] + .error(exception)( + s"Writing to ${rawConfig.exchange} errored (${retryDetails.retriesSoFar} retries from cats-retry)" + ) + ) + } + } yield sink + + private def getRetryPolicy[F[_]: Applicative](backoffPolicy: BackoffPolicy): RetryPolicy[F] = + capDelay[F](backoffPolicy.maxBackoff, fullJitter[F](backoffPolicy.minBackoff)) + .join(limitRetries(backoffPolicy.maxRetries)) +} diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Source.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Source.scala new file mode 100644 index 000000000..78264ef49 --- /dev/null +++ b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Source.scala @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2022-2022 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.rabbitmq + +import cats.Applicative +import cats.data.Kleisli +import cats.implicits._ + +import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Sync} + +import fs2.Stream + +import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig +import dev.profunktor.fs2rabbit.model._ +import dev.profunktor.fs2rabbit.interpreter.RabbitClient +import dev.profunktor.fs2rabbit.effects.EnvelopeDecoder + +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input + +object Source { + + def init[F[_]: ConcurrentEffect: ContextShift]( + blocker: Blocker, + input: Input + ): Stream[F, Record[F]] = + input match { + case r: Input.RabbitMQ => + val mapped = mapConfig(r.cluster) + initSource[F](blocker, r, mapped) + case i => + Stream.raiseError[F](new IllegalArgumentException(s"Input $i is not RabbitMQ")) + } + + private def initSource[F[_]: ConcurrentEffect: ContextShift]( + blocker: Blocker, + rawConfig: Input.RabbitMQ, + config: Fs2RabbitConfig + ): Stream[F, Record[F]] = + for { + client <- Stream.eval[F, RabbitClient[F]](createClient[F](blocker, config)) + records <- createStreamFromClient(client, rawConfig) + } yield records + + private def createStreamFromClient[F[_]: Sync]( + client: RabbitClient[F], + rawConfig: Input.RabbitMQ + ): Stream[F, Record[F]] = + Stream.resource(client.createConnectionChannel).flatMap { implicit channel => + val queueName = QueueName(rawConfig.queue) + for { + _ <- Stream.eval(client.declareQueuePassive(queueName)) + (acker, stream) <- Stream.eval(client.createAckerConsumer[Array[Byte]](queueName)) + records <- stream.map(envelope => Record(envelope.payload, acker(AckResult.Ack(envelope.deliveryTag)))) + } yield records + } + + implicit def bytesDecoder[F[_]: Applicative]: EnvelopeDecoder[F, Array[Byte]] = + Kleisli(_.payload.pure[F]) +} diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/package.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/package.scala new file mode 100644 index 000000000..c30c07e12 --- /dev/null +++ b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/package.scala @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022-2022 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich + +import cats.effect.{Blocker, ConcurrentEffect, ContextShift} + +import dev.profunktor.fs2rabbit.config.{Fs2RabbitConfig, Fs2RabbitNodeConfig} +import dev.profunktor.fs2rabbit.interpreter.RabbitClient + +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.RabbitMQConfig + +package object rabbitmq { + + def mapConfig(raw: RabbitMQConfig): Fs2RabbitConfig = + Fs2RabbitConfig( + nodes = raw.nodes.map(node => Fs2RabbitNodeConfig(node.host, node.port)), + username = Some(raw.username), + password = Some(raw.password), + virtualHost = raw.virtualHost, + ssl = raw.ssl, + connectionTimeout = raw.connectionTimeout, + requeueOnNack = false, // nack is never used in the app + requeueOnReject = false, // reject is never used in the app + internalQueueSize = Some(raw.internalQueueSize), + automaticRecovery = raw.automaticRecovery, + requestedHeartbeat = raw.requestedHeartbeat + ) + + def createClient[F[_]: ConcurrentEffect: ContextShift]( + blocker: Blocker, + config: Fs2RabbitConfig + ): F[RabbitClient[F]] = + RabbitClient[F](config, blocker) +} diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index f8932f4a6..b52e1aaa3 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -99,6 +99,14 @@ object BuildSettings { buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.kinesis.generated" ) + lazy val rabbitmqProjectSettings = projectSettings ++ Seq( + name := "snowplow-enrich-rabbitmq", + moduleName := "snowplow-enrich-rabbitmq", + description := "High-performance streaming enrich app for RabbitMQ, built on top of functional streams", + buildInfoKeys := Seq[BuildInfoKey](organization, name, version, description), + buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.rabbitmq.generated" + ) + /** Make package (build) metadata available within source code. */ lazy val scalifiedSettings = Seq( Compile / sourceGenerators += Def.task { @@ -311,6 +319,16 @@ object BuildSettings { lazy val kinesisDistrolessBuildSettings = kinesisBuildSettings.diff(dockerSettingsFocal) ++ dockerSettingsDistroless + lazy val rabbitmqBuildSettings = { + // Project + rabbitmqProjectSettings ++ buildSettings ++ + // Build and publish + assemblySettings ++ dockerSettingsFocal ++ + Seq(Docker / packageName := "snowplow-enrich-rabbitmq") + } + + lazy val rabbitmqDistrolessBuildSettings = rabbitmqBuildSettings.diff(dockerSettingsFocal) ++ dockerSettingsDistroless + /** Fork a JVM per test in order to not reuse enrichment registries. */ def oneJVMPerTest(tests: Seq[TestDefinition]): Seq[Tests.Group] = tests.map(t => Tests.Group(t.name, Seq(t), Tests.SubProcess(ForkOptions()))) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 132ca3d77..ac6a53460 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -87,6 +87,7 @@ object Dependencies { val catsRetry = "2.1.0" val specsDiff = "0.6.0" val eventGen = "0.2.0" + val fs2RabbitMQ = "3.0.1" // latest version without CE3 val scopt = "3.7.1" val pureconfig = "0.11.0" @@ -203,6 +204,7 @@ object Dependencies { val http4sServer = "org.http4s" %% "http4s-blaze-server" % V.http4s % Test val trackerCore = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.snowplowTracker val emitterHttps = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-http4s" % V.snowplowTracker + val fs2RabbitMQ = "dev.profunktor" %% "fs2-rabbit" % V.fs2RabbitMQ // compiler plugins val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor @@ -329,6 +331,10 @@ object Dependencies { sts, specs2 ) + + val rabbitmqDependencies = Seq( + fs2RabbitMQ + ) // exclusions val exclusions = Seq(