diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1c5963cc9..a8278d1df 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,6 +49,13 @@ jobs: run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" sbt coverage +test env: OER_KEY: ${{ secrets.OER_KEY }} + - name: Run integration tests + id: integrationTest + run: | + sbt "project kafka" "docker:publishLocal" + docker-compose -f integration-tests/enrich-kafka/docker-compose.yml up -d + sbt "project kafka" "it:test" + docker-compose -f integration-tests/enrich-kafka/docker-compose.yml down - name: Check Scala formatting if: ${{ always() }} run: sbt scalafmtCheckAll @@ -85,7 +92,8 @@ jobs: "project streamNsq; set assembly / test := {}; assembly" \ "project pubsub; set assembly / test := {}; assembly" \ "project kinesis; set assembly / test := {}; assembly" \ - "project rabbitmq; set assembly / test := {}; assembly" + "project rabbitmq; set assembly / test := {}; assembly" \ + "project kafka; set assembly / test := {}; assembly" - name: Create GitHub release and attach artifacts uses: softprops/action-gh-release@v1 with: @@ -100,6 +108,7 @@ jobs: modules/pubsub/target/scala-2.12/snowplow-enrich-pubsub-${{ steps.ver.outputs.tag }}.jar modules/kinesis/target/scala-2.12/snowplow-enrich-kinesis-${{ steps.ver.outputs.tag }}.jar modules/rabbitmq/target/scala-2.12/snowplow-enrich-rabbitmq-${{ steps.ver.outputs.tag }}.jar + modules/kafka/target/scala-2.12/snowplow-enrich-kafka-${{ steps.ver.outputs.tag }}.jar env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} @@ -115,6 +124,7 @@ jobs: - streamNsq - pubsub - kinesis + - kafka - rabbitmq include: - suffix: "" @@ -231,4 +241,3 @@ jobs: PGP_SECRET: ${{ secrets.SONA_PGP_SECRET }} SONATYPE_USERNAME: ${{ secrets.SONA_USER }} SONATYPE_PASSWORD: ${{ secrets.SONA_PASS }} - diff --git a/.github/workflows/lacework.yml b/.github/workflows/lacework.yml index 801d8d397..2e4536712 100644 --- a/.github/workflows/lacework.yml +++ b/.github/workflows/lacework.yml @@ -101,3 +101,31 @@ jobs: LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }} LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }} run: ./lw-scanner image evaluate snowplow/stream-enrich-nsq ${{ steps.ver.outputs.tag }}-distroless --build-id ${{ github.run_id }} --no-pull + + - name: Scan enrich-kafka + env: + LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }} + LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }} + LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }} + run: ./lw-scanner image evaluate snowplow/snowplow-enrich-kafka ${{ steps.ver.outputs.tag }} --build-id ${{ github.run_id }} --no-pull + + - name: Scan enrich-kafka distroless + env: + LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }} + LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }} + LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }} + run: ./lw-scanner image evaluate snowplow/snowplow-enrich-kafka ${{ steps.ver.outputs.tag }}-distroless --build-id ${{ github.run_id }} --no-pull + + - name: Scan enrich-rabbitmq + env: + LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }} + LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }} + LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }} + run: ./lw-scanner image evaluate snowplow/snowplow-enrich-rabbitmq ${{ steps.ver.outputs.tag }} --build-id ${{ github.run_id }} --no-pull + + - name: Scan enrich-rabbitmq distroless + env: + LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }} + LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }} + LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }} + run: ./lw-scanner image evaluate snowplow/snowplow-enrich-rabbitmq ${{ steps.ver.outputs.tag }}-distroless --build-id ${{ github.run_id }} --no-pull diff --git a/build.sbt b/build.sbt index ba3e8806f..430ab5ff7 100644 --- a/build.sbt +++ b/build.sbt @@ -23,7 +23,7 @@ lazy val root = project.in(file(".")) .settings(projectSettings) .settings(compilerSettings) .settings(resolverSettings) - .aggregate(common, commonFs2, pubsub, pubsubDistroless, kinesis, kinesisDistroless, streamCommon, streamKinesis, streamKinesisDistroless, streamKafka, streamKafkaDistroless, streamNsq, streamNsqDistroless, streamStdin, rabbitmq, rabbitmqDistroless) + .aggregate(common, commonFs2, pubsub, pubsubDistroless, kinesis, kinesisDistroless, streamCommon, streamKinesis, streamKinesisDistroless, streamKafka, streamKafkaDistroless, streamNsq, streamNsqDistroless, streamStdin, kafka, kafkaDistroless, rabbitmq, rabbitmqDistroless) lazy val common = project .in(file("modules/common")) @@ -151,6 +151,31 @@ lazy val kinesisDistroless = project .settings(addCompilerPlugin(betterMonadicFor)) .dependsOn(commonFs2) +lazy val kafka = project + .in(file("modules/kafka")) + .enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin) + .settings(kafkaBuildSettings) + .settings(libraryDependencies ++= kafkaDependencies ++ Seq( + // integration test dependencies + specs2CEIt, + specs2ScalacheckIt + )) + .settings(excludeDependencies ++= exclusions) + .settings(Defaults.itSettings) + .configs(IntegrationTest) + .settings(addCompilerPlugin(betterMonadicFor)) + .dependsOn(commonFs2) + +lazy val kafkaDistroless = project + .in(file("modules/distroless/kafka")) + .enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin, LauncherJarPlugin) + .settings(sourceDirectory := (kafka / sourceDirectory).value) + .settings(kafkaDistrolessBuildSettings) + .settings(libraryDependencies ++= kafkaDependencies) + .settings(excludeDependencies ++= exclusions) + .settings(addCompilerPlugin(betterMonadicFor)) + .dependsOn(commonFs2) + lazy val bench = project .in(file("modules/bench")) .dependsOn(pubsub % "test->test") diff --git a/config/config.kafka.extended.hocon b/config/config.kafka.extended.hocon new file mode 100644 index 000000000..0fa220fc4 --- /dev/null +++ b/config/config.kafka.extended.hocon @@ -0,0 +1,236 @@ +{ + # Where to read collector payloads from + "input": { + "type": "Kafka" + + # Name of the Kafka topic to read from + "topicName": "collector-payloads" + + # A list of host:port pairs to use for establishing the initial connection to the Kafka cluster + # This list should be in the form host1:port1,host2:port2,... + "bootstrapServers": "localhost:9092" + + # Optional, Kafka Consumer configuration + # See https://kafka.apache.org/documentation/#consumerconfigs for all properties + "consumerConf": { + "auto.offset.reset" : "earliest" + "session.timeout.ms": "45000" + } + } + + "output": { + # Enriched events output + "good": { + "type": "Kafka" + + # Name of the Kafka topic to write to + "topicName": "enriched" + + # A list of host:port pairs to use for establishing the initial connection to the Kafka cluster + # This list should be in the form host1:port1,host2:port2,... + "bootstrapServers": "localhost:9092" + + # Optional, Kafka producer configuration + # See https://kafka.apache.org/documentation/#producerconfigs for all properties + "producerConf": { + "acks": "all" + } + + # Optional. Enriched event field to use as Kafka partition key + "partitionKey": "app_id" + + # Optional. Enriched event fields to add as Kafka record headers + "headers": [ "app_id" ] + } + + # Optional. Pii events output. Should be omitted if pii events are not emitted + "pii": { + "type": "Kafka" + + # Name of the Kafka topic to write to + "topicName": "pii" + + # A list of host:port pairs to use for establishing the initial connection to the Kafka cluster + # This list should be in the form host1:port1,host2:port2,... + "bootstrapServers": "localhost:9092" + + # Optional, Kafka producer configuration + # See https://kafka.apache.org/documentation/#producerconfigs for all properties + "producerConf": { + "acks": "all" + } + + # Optional. Enriched event field to use as Kafka partition key + "partitionKey": "app_id" + + # Optional. Enriched event fields to add as Kafka record headers + "headers": [ "app_id" ] + } + + # Bad rows output + "bad": { + "type": "Kafka" + + # Name of the Kafka topic to write to + "topicName": "bad" + + # A list of host:port pairs to use for establishing the initial connection to the Kafka cluster + # This list should be in the form host1:port1,host2:port2,... + "bootstrapServers": "localhost:9092" + + # Optional, Kafka producer configuration + # See https://kafka.apache.org/documentation/#producerconfigs for all properties + "producerConf": { + "acks": "all" + } + } + } + + # Optional. Concurrency of the app + "concurrency" : { + # Number of events that can get enriched at the same time within a chunk + "enrich": 256 + # Number of chunks that can get sunk at the same time + # WARNING: if greater than 1, records can get checkpointed before they are sunk + "sink": 1 + } + + # Optional, period after which enrich assets should be checked for updates + # no assets will be updated if the key is absent + "assetsUpdatePeriod": "7 days" + + # Optional, configuration of remote adapters + "remoteAdapters": { + # how long enrich waits to establish a connection to remote adapters + "connectionTimeout": "10 seconds", + # how long enrich waits to get a response from remote adapters + "readTimeout": "45 seconds", + # how many connections enrich opens at maximum for remote adapters + # increasing this could help with throughput in case of adapters with high latency + "maxConnections": 10, + # a list of remote adapter configs + "configs": [ + { + "vendor": "com.example", + "version": "v1", + "url": "https://remote-adapter.com" + } + ] + } + + "monitoring": { + + # Optional, for tracking runtime exceptions + "sentry": { + "dsn": "http://sentry.acme.com" + } + + # Optional, configure how metrics are reported + "metrics": { + + # Optional. Send metrics to a StatsD server on localhost + "statsd": { + "hostname": "localhost" + "port": 8125 + + # Required, how frequently to report metrics + "period": "10 seconds" + + # Any key-value pairs to be tagged on every StatsD metric + "tags": { + "app": enrich + } + + # Optional, override the default metric prefix + # "prefix": "snowplow.enrich." + } + + # Optional. Log to stdout using Slf4j + "stdout": { + "period": "10 seconds" + + # Optional, override the default metric prefix + # "prefix": "snowplow.enrich." + } + + # Optional. Send KCL and KPL metrics to Cloudwatch + "cloudwatch": true + } + } + + # Optional, configure telemetry + # All the fields are optional + "telemetry": { + + # Set to true to disable telemetry + "disable": false + + # Interval for the heartbeat event + "interval": 15 minutes + + # HTTP method used to send the heartbeat event + "method": POST + + # URI of the collector receiving the heartbeat event + "collectorUri": collector-g.snowplowanalytics.com + + # Port of the collector receiving the heartbeat event + "collectorPort": 443 + + # Whether to use https or not + "secure": true + + # Identifier intended to tie events together across modules, + # infrastructure and apps when used consistently + "userProvidedId": my_pipeline + + # ID automatically generated upon running a modules deployment script + # Intended to identify each independent module, and the infrastructure it controls + "autoGeneratedId": hfy67e5ydhtrd + + # Unique identifier for the VM instance + # Unique for each instance of the app running within a module + "instanceId": 665bhft5u6udjf + + # Name of the terraform module that deployed the app + "moduleName": enrich-kafka-ce + + # Version of the terraform module that deployed the app + "moduleVersion": 1.0.0 + } + + # Optional. To activate/deactive enrich features that are still in beta + # or that are here for transition. + # This section might change in future versions + "featureFlags" : { + + # Enrich 3.0.0 introduces the validation of the enriched events against atomic schema + # before emitting. + # If set to false, a bad row will be emitted instead of the enriched event + # if validation fails. + # If set to true, invalid enriched events will be emitted, as before. + # WARNING: this feature flag will be removed in a future version + # and it will become impossible to emit invalid enriched events. + # More details: https://github.com/snowplow/enrich/issues/517#issuecomment-1033910690 + "acceptInvalid": false + + # In early versions of enrich-kinesis and enrich-pubsub (pre-3.1.4), the Javascript enrichment + # incorrectly ran before the currency, weather, and IP Lookups enrichments. Set this flag to true + # to keep the erroneous behaviour of those previous versions. This flag will be removed in a + # future version. + # More details: https://github.com/snowplow/enrich/issues/619 + "legacyEnrichmentOrder": false + } + + # Optional. Configuration for experimental/preview features + "experimental": { + # Whether to export metadata using a webhook URL. + # Follows iglu-webhook protocol. + "metadata": { + "endpoint": "https://my_pipeline.my_domain.com/iglu" + "interval": 5 minutes + "organizationId": "c5f3a09f-75f8-4309-bec5-fea560f78455" + "pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784" + } + } +} diff --git a/config/config.kafka.minimal.hocon b/config/config.kafka.minimal.hocon new file mode 100644 index 000000000..6b9078507 --- /dev/null +++ b/config/config.kafka.minimal.hocon @@ -0,0 +1,18 @@ +{ + "input": { + "topicName": "collector-payloads" + "bootstrapServers": "localhost:9092" + } + + "output": { + "good": { + "topicName": "enriched" + "bootstrapServers": "localhost:9092" + } + + "bad": { + "topicName": "bad" + "bootstrapServers": "localhost:9092" + } + } +} diff --git a/integration-tests/enrich-kafka/config/enrich-kafka.hocon b/integration-tests/enrich-kafka/config/enrich-kafka.hocon new file mode 100644 index 000000000..349ec60c8 --- /dev/null +++ b/integration-tests/enrich-kafka/config/enrich-kafka.hocon @@ -0,0 +1,58 @@ +{ + "input": { + "type": "Kafka" + "topicName": "it-enrich-kinesis-collector-payloads" + "bootstrapServers": "broker:29092" + "consumerConf": { + "enable.auto.commit": "false" + "auto.offset.reset" : "earliest" + } + } + + "output": { + "good": { + "type": "Kafka" + "topicName": "it-enrich-kinesis-enriched" + "bootstrapServers": "broker:29092" + "partitionKey": "app_id" + "headers": ["app_id"] + } + + "bad": { + "type": "Kafka" + "topicName": "it-enrich-kinesis-bad" + "bootstrapServers": "broker:29092" + } + } + + + "monitoring": { + + "sentry": { + "dsn": "" + } + + # Optional, configure how metrics are reported + "metrics": { + + # Optional. Log to stdout using Slf4j + "stdout": { + "period": "10 seconds" + + # Optional, override the default metric prefix + # "prefix": "snowplow.enrich." + } + + # Optional. Send KCL and KPL metrics to Cloudwatch + "cloudwatch": false + } + } + + "telemetry": { + "disable": true + } + + "featureFlags": { + "acceptInvalid": true + } +} diff --git a/integration-tests/enrich-kafka/config/enrichments/yauaa_enrichment_config.json b/integration-tests/enrich-kafka/config/enrichments/yauaa_enrichment_config.json new file mode 100644 index 000000000..058b63412 --- /dev/null +++ b/integration-tests/enrich-kafka/config/enrichments/yauaa_enrichment_config.json @@ -0,0 +1,9 @@ +{ + "schema": "iglu:com.snowplowanalytics.snowplow.enrichments/yauaa_enrichment_config/jsonschema/1-0-0", + "data": { + "vendor": "com.snowplowanalytics.snowplow.enrichments", + "name": "yauaa_enrichment_config", + "enabled": true + } +} + diff --git a/integration-tests/enrich-kafka/config/iglu_resolver.json b/integration-tests/enrich-kafka/config/iglu_resolver.json new file mode 100644 index 000000000..b9bf079af --- /dev/null +++ b/integration-tests/enrich-kafka/config/iglu_resolver.json @@ -0,0 +1,28 @@ +{ + "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1", + "data": { + "cacheSize": 500, + "repositories": [ + { + "name": "Iglu Central", + "priority": 0, + "vendorPrefixes": [ "com.snowplowanalytics" ], + "connection": { + "http": { + "uri": "http://iglucentral.com" + } + } + }, + { + "name": "Iglu Central - GCP Mirror", + "priority": 1, + "vendorPrefixes": [ "com.snowplowanalytics" ], + "connection": { + "http": { + "uri": "http://mirror01.iglucentral.com" + } + } + } + ] + } +} diff --git a/integration-tests/enrich-kafka/docker-compose.yml b/integration-tests/enrich-kafka/docker-compose.yml new file mode 100644 index 000000000..4a38930ad --- /dev/null +++ b/integration-tests/enrich-kafka/docker-compose.yml @@ -0,0 +1,45 @@ +--- +version: '3' +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.0.1 + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + broker: + image: confluentinc/cp-kafka:7.0.1 + container_name: broker + ports: + - "9092:9092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + + enrich: + image: snowplow/snowplow-enrich-kafka:latest + container_name: enrich-kafka + depends_on: + - broker + command: [ + "--config", "/snowplow/config/enrich-kafka.hocon", + "--iglu-config", "/snowplow/config/iglu_resolver.json", + "--enrichments", "/snowplow/config/enrichments" + ] + restart: always + volumes: + - ./config:/snowplow/config/ + logging: + options: + max-size: "10M" + max-file: "10" + environment: + - "JAVA_OPTS=-Xmx1G -Dlog4j2.formatMsgNoLookups=true" diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/AttributedData.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/AttributedData.scala index 18abf9a98..a062b5ad5 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/AttributedData.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/AttributedData.scala @@ -12,5 +12,15 @@ */ package com.snowplowanalytics.snowplow.enrich.common.fs2 -/** Data intended to be sent to a sink, plus key-value pairs to be added to the message as attributes */ -final case class AttributedData[A](data: A, attributes: Map[String, String]) +/** + * Represents a payload, partition key and attributes of the payload + * @param data payload to be sent to the sink + * @param partitionKey field name to be used as partition key, supported by Kinesis and Kafka + * @param attributes key-value pairs to be added to the message as attributes, supported by PubSub and Kafka + * @tparam A type of the payload + */ +final case class AttributedData[A]( + data: A, + partitionKey: String, + attributes: Map[String, String] +) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala index 2b75f1766..7094aff3a 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Enrich.scala @@ -195,7 +195,7 @@ object Enrich { val (moreBad, good) = enriched.map { e => serializeEnriched(e, env.processor, env.streamsSettings.maxRecordSize) - .map(bytes => (e, AttributedData(bytes, env.goodAttributes(e)))) + .map(bytes => (e, AttributedData(bytes, env.goodPartitionKey(e), env.goodAttributes(e)))) }.separate val allBad = (bad ++ moreBad).map(badRowResize(env, _)) @@ -207,6 +207,7 @@ object Enrich { env.metrics.goodCount, env.metadata.observe, env.sinkPii, + env.piiPartitionKey, env.piiAttributes, env.processor, env.streamsSettings.maxRecordSize @@ -221,13 +222,20 @@ object Enrich { incMetrics: Int => F[Unit], metadata: List[EnrichedEvent] => F[Unit], piiSink: Option[AttributedByteSink[F]], + piiPartitionKey: EnrichedEvent => String, piiAttributes: EnrichedEvent => Map[String, String], processor: Processor, maxRecordSize: Int ): F[Unit] = { val enriched = good.map(_._1) val serialized = good.map(_._2) - sink(serialized) *> incMetrics(good.size) *> metadata(enriched) *> sinkPii(enriched, piiSink, piiAttributes, processor, maxRecordSize) + sink(serialized) *> incMetrics(good.size) *> metadata(enriched) *> sinkPii(enriched, + piiSink, + piiPartitionKey, + piiAttributes, + processor, + maxRecordSize + ) } def sinkBad[F[_]: Monad]( @@ -240,6 +248,7 @@ object Enrich { def sinkPii[F[_]: Sync]( enriched: List[EnrichedEvent], maybeSink: Option[AttributedByteSink[F]], + partitionKey: EnrichedEvent => String, attributes: EnrichedEvent => Map[String, String], processor: Processor, maxRecordSize: Int @@ -249,7 +258,7 @@ object Enrich { val (bad, serialized) = enriched .flatMap(ConversionUtils.getPiiEvent(processor, _)) - .map(e => serializeEnriched(e, processor, maxRecordSize).map(AttributedData(_, attributes(e)))) + .map(e => serializeEnriched(e, processor, maxRecordSize).map(AttributedData(_, partitionKey(e), attributes(e)))) .separate val logging = if (bad.nonEmpty) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala index 7cfa39cf9..eb237de6f 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala @@ -81,6 +81,8 @@ import java.util.concurrent.TimeoutException * @param metrics common counters * @param metadata metadata aggregations * @param assetsUpdatePeriod time after which enrich assets should be refresh + * @param goodPartitionKey field from an enriched event to use as output partition key + * @param piiPartitionKey field from a PII event to use as output partition key * @param goodAttributes fields from an enriched event to use as output message attributes * @param piiAttributes fields from a PII event to use as output message attributes * @param telemetryConfig configuration for telemetry @@ -112,6 +114,8 @@ final case class Environment[F[_], A]( metrics: Metrics[F], metadata: Metadata[F], assetsUpdatePeriod: Option[FiniteDuration], + goodPartitionKey: EnrichedEvent => String, + piiPartitionKey: EnrichedEvent => String, goodAttributes: EnrichedEvent => Map[String, String], piiAttributes: EnrichedEvent => Map[String, String], telemetryConfig: TelemetryConfig, @@ -210,6 +214,8 @@ object Environment { metrics, metadata, file.assetsUpdatePeriod, + parsedConfigs.goodPartitionKey, + parsedConfigs.piiPartitionKey, parsedConfigs.goodAttributes, parsedConfigs.piiAttributes, file.telemetry, diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala index 98bdf6885..926985ce1 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ConfigFile.scala @@ -78,6 +78,9 @@ object ConfigFile { // Remove pii output if topic empty case c @ ConfigFile(_, Outputs(good, Some(Output.PubSub(t, _, _, _, _)), bad), _, _, _, _, _, _, _) if t.isEmpty => c.copy(output = Outputs(good, None, bad)).asRight + // Remove pii output if topic empty + case c @ ConfigFile(_, Outputs(good, Some(Output.Kafka(topicName, _, _, _, _)), bad), _, _, _, _, _, _, _) if topicName.isEmpty => + c.copy(output = Outputs(good, None, bad)).asRight case other => other.asRight } implicit val configFileEncoder: Encoder[ConfigFile] = diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala index c812d1d1e..8c7473249 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigs.scala @@ -13,6 +13,7 @@ package com.snowplowanalytics.snowplow.enrich.common.fs2.config import java.lang.reflect.Field +import java.util.UUID import _root_.io.circe.Json import _root_.io.circe.syntax._ @@ -44,6 +45,8 @@ final case class ParsedConfigs( igluJson: Json, enrichmentConfigs: List[EnrichmentConf], configFile: ConfigFile, + goodPartitionKey: EnrichedEvent => String, + piiPartitionKey: EnrichedEvent => String, goodAttributes: EnrichedEvent => Map[String, String], piiAttributes: EnrichedEvent => Map[String, String] ) @@ -77,6 +80,8 @@ object ParsedConfigs { _ <- EitherT.liftF( Logger[F].info(s"Parsed config file: ${configFile}") ) + goodPartitionKey = outputPartitionKey(configFile.output.good) + piiPartitionKey = configFile.output.pii.map(outputPartitionKey).getOrElse { _: EnrichedEvent => "" } goodAttributes = outputAttributes(configFile.output.good) piiAttributes = configFile.output.pii.map(outputAttributes).getOrElse { _: EnrichedEvent => Map.empty[String, String] } resolverConfig <- @@ -90,7 +95,7 @@ object ParsedConfigs { show"Cannot decode enrichments - ${x.mkString_(", ")}" } _ <- EitherT.liftF(Logger[F].info(show"Parsed following enrichments: ${configs.map(_.schemaKey.name).mkString(", ")}")) - } yield ParsedConfigs(igluJson, configs, configFile, goodAttributes, piiAttributes) + } yield ParsedConfigs(igluJson, configs, configFile, goodPartitionKey, piiPartitionKey, goodAttributes, piiAttributes) private[config] def validateConfig[F[_]: Applicative](configFile: ConfigFile): EitherT[F, String, ConfigFile] = { val goodCheck: ValidationResult[OutputConfig] = validateAttributes(configFile.output.good) @@ -114,31 +119,52 @@ object ParsedConfigs { } case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _, _) if !enrichedFieldsMap.contains(key) => NonEmptyList.one(s"Partition key $key not valid").invalid + case ka: OutputConfig.Kafka if !ka.headers.forall(enrichedFieldsMap.contains) => + NonEmptyList + .one( + s"Fields [${ka.headers.filterNot(enrichedFieldsMap.contains).mkString(", ")}] for headers are not part of the enriched event" + ) + .invalid case _ => output.valid } private[config] def outputAttributes(output: OutputConfig): EnrichedEvent => Map[String, String] = output match { - case OutputConfig.PubSub(_, Some(attributes), _, _, _) => - val fields = ParsedConfigs.enrichedFieldsMap.filter { - case (s, _) => - attributes.contains(s) - } - attributesFromFields(fields) - case OutputConfig.Kinesis(_, _, Some(key), _, _, _, _, _) => - val fields = ParsedConfigs.enrichedFieldsMap.filter { - case (s, _) => - s == key - } - attributesFromFields(fields) - case _ => - _ => Map.empty + case OutputConfig.PubSub(_, Some(attributes), _, _, _) => attributesFromFields(attributes) + case OutputConfig.Kafka(_, _, _, headers, _) => attributesFromFields(headers) + case _ => _ => Map.empty } - private def attributesFromFields(fields: Map[String, Field])(ee: EnrichedEvent): Map[String, String] = - fields.flatMap { - case (k, f) => - Option(f.get(ee)).map(v => k -> v.toString) + private[config] def attributesFromFields(attributes: Set[String]): EnrichedEvent => Map[String, String] = { + val fields = ParsedConfigs.enrichedFieldsMap.filter { + case (s, _) => + attributes.contains(s) + } + (ee: EnrichedEvent) => + fields.flatMap { + case (k, f) => + Option(f.get(ee)).map(v => k -> v.toString) + } + } + + private[config] def outputPartitionKey(output: OutputConfig): EnrichedEvent => String = + output match { + case OutputConfig.Kafka(_, _, partitionKey, _, _) => partitionKeyFromFields(partitionKey) + case OutputConfig.Kinesis(_, _, Some(partitionKey), _, _, _, _, _) => partitionKeyFromFields(partitionKey) + case _ => _ => UUID.randomUUID().toString } + + private[config] def partitionKeyFromFields(partitionKey: String): EnrichedEvent => String = + ParsedConfigs.enrichedFieldsMap.get(partitionKey).fold[EnrichedEvent => String](_ => UUID.randomUUID().toString) { f => ee => + Option(f.get(ee)).fold(UUID.randomUUID().toString)(_.toString) + } + + /** Checks if partitionKey is a valid enriched event field name */ + private[config] def isValidPartitionKey(partitionKey: String): Boolean = + ParsedConfigs.enrichedFieldsMap.contains(partitionKey) + + /** Filters attributes' members which are not valid enriched event field names */ + private[config] def filterInvalidAttributes(attributes: Set[String]): Set[String] = + attributes.filterNot(ParsedConfigs.enrichedFieldsMap.contains) } diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala index a811003b8..52b4b866b 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/io.scala @@ -105,6 +105,12 @@ object io { object Input { + case class Kafka private ( + topicName: String, + bootstrapServers: String, + consumerConf: Map[String, String] + ) extends Input + case class PubSub private ( subscription: String, parallelPullCount: Int, @@ -215,6 +221,8 @@ object io { case _ => s"Subscription must conform projects/project-name/subscriptions/subscription-name format, $s given".asLeft } + case Kafka(topicName, bootstrapServers, _) if topicName.isEmpty ^ bootstrapServers.isEmpty => + "Both topicName and bootstrapServers have to be set".asLeft case other => other.asRight } .emap { @@ -246,6 +254,15 @@ object io { sealed trait Output object Output { + + case class Kafka private ( + topicName: String, + bootstrapServers: String, + partitionKey: String, + headers: Set[String], + producerConf: Map[String, String] + ) extends Output + case class PubSub private ( topic: String, attributes: Option[Set[String]], @@ -284,6 +301,8 @@ object io { implicit val outputDecoder: Decoder[Output] = deriveConfiguredDecoder[Output] .emap { + case Kafka(topicName, bootstrapServers, _, _, _) if topicName.isEmpty ^ bootstrapServers.isEmpty => + "Both topicName and bootstrapServers have to be set".asLeft case s @ PubSub(top, _, _, _, _) if top.nonEmpty => top.split("/").toList match { case List("projects", _, "topics", _) => @@ -296,6 +315,12 @@ object io { case other => other.asRight } .emap { + case Kafka(_, _, pk, _, _) if pk.nonEmpty && !ParsedConfigs.isValidPartitionKey(pk) => + s"Kafka partition key [$pk] is invalid".asLeft + case ka: Kafka if ka.headers.nonEmpty => + val invalidAttrs = ParsedConfigs.filterInvalidAttributes(ka.headers) + if (invalidAttrs.nonEmpty) s"Kafka headers [${invalidAttrs.mkString(",")}] are invalid".asLeft + else ka.asRight case p: PubSub if p.delayThreshold < Duration.Zero => "PubSub delay threshold cannot be less than 0".asLeft case p: PubSub if p.maxBatchSize < 0 => diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EnrichSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EnrichSpec.scala index 4bf5f5e9e..b0d17ebc7 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EnrichSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/EnrichSpec.scala @@ -207,8 +207,9 @@ class EnrichSpec extends Specification with CatsIO with ScalaCheck { bad <- test.bad } yield { good should beLike { - case Vector(AttributedData(bytes, attrs)) => + case Vector(AttributedData(bytes, pk, attrs)) => bytes must not be empty + pk must beEqualTo("test_good_partition_key") attrs must contain(exactly("app_id" -> "test_app")) } @@ -235,14 +236,16 @@ class EnrichSpec extends Specification with CatsIO with ScalaCheck { } yield { good should beLike { - case Vector(AttributedData(bytes, attrs)) => + case Vector(AttributedData(bytes, pk, attrs)) => bytes must not be empty + pk must beEqualTo("test_good_partition_key") attrs must contain(exactly("app_id" -> "test_app")) } pii should beLike { - case Vector(AttributedData(bytes, attrs)) => + case Vector(AttributedData(bytes, pk, attrs)) => bytes must not be empty + pk must beEqualTo("test_pii_partition_key") attrs must contain(exactly("platform" -> "srv")) } diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala index 2bbe81057..30d355e4e 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/config/ParsedConfigsSpec.scala @@ -106,4 +106,40 @@ class ParsedConfigsSpec extends Specification with CatsIO { result must haveValue("test_app") } } + + "attributesFromFields" should { + "fetch attribute values" in { + val ee = new EnrichedEvent() + ee.app_id = "test_attributesFromFields" + + val result = ParsedConfigs.attributesFromFields(Set("app_id"))(ee) + + result must haveSize(1) + result must haveKey("app_id") + result must haveValue("test_attributesFromFields") + } + } + + "outputPartitionKey" should { + "fetch partition key's value" in { + val output = io.Output.Kafka("good-topic", "localhost:9092", "app_id", Set(), Map.empty) + val ee = new EnrichedEvent() + ee.app_id = "test_outputPartitionKey" + + val result = ParsedConfigs.outputPartitionKey(output)(ee) + + result must beEqualTo("test_outputPartitionKey") + } + } + + "partitionKeyFromFields" should { + "fetch partition key's value" in { + val ee = new EnrichedEvent() + ee.app_id = "test_partitionKeyFromFields" + + val result = ParsedConfigs.partitionKeyFromFields("app_id")(ee) + + result must beEqualTo("test_partitionKeyFromFields") + } + } } diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala index 51e73c887..779fd8df8 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala @@ -164,6 +164,8 @@ object TestEnvironment extends CatsIO { metrics, metadata, None, + _ => "test_good_partition_key", + _ => "test_pii_partition_key", _ => Map.empty, _ => Map.empty, Telemetry(true, 1.minute, "POST", "foo.bar", 1234, true, None, None, None, None, None), diff --git a/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/CollectorPayloadGen.scala b/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/CollectorPayloadGen.scala new file mode 100644 index 000000000..48ba4831b --- /dev/null +++ b/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/CollectorPayloadGen.scala @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2022 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.kafka.test + +import fs2.Stream + +import cats.implicits._ + +import cats.effect.Sync + +import io.circe.syntax._ +import io.circe.{Encoder, Json, JsonObject} + +import org.scalacheck.{Arbitrary, Gen} + +import org.joda.time.DateTime + +import org.apache.thrift.TSerializer + +import java.util.Base64 + +import com.snowplowanalytics.iglu.core.{ SelfDescribingData, SchemaKey, SchemaVer } +import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._ + +import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload + +object CollectorPayloadGen { + + private val serializer = new TSerializer() + private val base64Encoder = Base64.getEncoder() + + def generate[F[_]: Sync](nbGoodEvents: Long, nbBadRows: Long): Stream[F, Array[Byte]] = + generateRaw(nbGoodEvents, nbBadRows).map(_.toThrift).map(serializer.serialize) + + def generateRaw[F[_]: Sync](nbGoodEvents: Long, nbBadRows: Long): Stream[F, CollectorPayload] = + Stream.repeatEval(runGen(collectorPayloadGen(true))).take(nbGoodEvents) ++ Stream.repeatEval(runGen(collectorPayloadGen(false))).take(nbBadRows) + + private def collectorPayloadGen(valid: Boolean): Gen[CollectorPayload] = + for { + vendor <- Gen.const("com.snowplowanalytics.snowplow") + version <- Gen.const("tp2") + api = CollectorPayload.Api(vendor, version) + + queryString = Nil + + contentType = Some("application/json") + + body <- bodyGen(valid).map(Some(_)) + + name = "scala-tracker_1.0.0" + encoding = "UTF8" + hostname = Some("example.acme") + source = CollectorPayload.Source(name, encoding, hostname) + + timestamp <- Gen.option(DateTime.now) + ipAddress <- Gen.option(ipAddressGen) + useragent <- Gen.option(userAgentGen) + refererUri = None + headers = Nil + userId <- Gen.uuid.map(Some(_)) + context = CollectorPayload.Context(timestamp, ipAddress, useragent, refererUri, headers, userId) + } yield CollectorPayload(api, queryString, contentType, body, source, context) + + private def bodyGen(valid: Boolean): Gen[String] = + for { + p <- Gen.oneOf("web", "mob", "app").withKey("p") + aid <- Gen.const("enrich-kinesis-integration-tests").withKey("aid") + e <- Gen.const("ue").withKey("e") + tv <- Gen.oneOf("scala-tracker_1.0.0", "js_2.0.0", "go_1.2.3").withKey("tv") + uePx <- + if(valid) + ueGen.map(_.toString).map(str => base64Encoder.encodeToString(str.getBytes)).withKey("ue_px") + else + Gen.const("foo").withKey("ue_px") + } yield SelfDescribingData( + SchemaKey("com.snowplowanalytics.snowplow", "payload_data", "jsonschema", SchemaVer.Full(1,0,4)), + List(asObject(List(p, aid, e, uePx, tv))).asJson + ).asJson.toString + + private def ueGen = + for { + sdj <- Gen.oneOf(changeFormGen, clientSessionGen) + } yield SelfDescribingData( + SchemaKey("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", SchemaVer.Full(1,0,0)), + sdj.asJson + ).asJson + + + private def changeFormGen = + for { + formId <- strGen(32, Gen.alphaNumChar).withKey("formId") + elementId <- strGen(32, Gen.alphaNumChar).withKey("elementId") + nodeName <- Gen.oneOf(List("INPUT", "TEXTAREA", "SELECT")).withKey("nodeName") + `type` <- Gen.option(Gen.oneOf(List("button", "checkbox", "color", "date", "datetime", "datetime-local", "email", "file", "hidden", "image", "month", "number", "password", "radio", "range", "reset", "search", "submit", "tel", "text", "time", "url", "week"))).withKeyOpt("type") + value <- Gen.option(strGen(16, Gen.alphaNumChar)).withKeyNull("value") + } yield SelfDescribingData( + SchemaKey("com.snowplowanalytics.snowplow", "change_form", "jsonschema", SchemaVer.Full(1,0,0)), + asObject(List(formId, elementId, nodeName, `type`, value)) + ) + + private def clientSessionGen = + for { + userId <- Gen.uuid.withKey("userId") + sessionId <- Gen.uuid.withKey("sessionId") + sessionIndex <- Gen.choose(0, 2147483647).withKey("sessionIndex") + previousSessionId <- Gen.option(Gen.uuid).withKeyNull("previousSessionId") + storageMechanism <- Gen.oneOf(List("SQLITE", "COOKIE_1", "COOKIE_3", "LOCAL_STORAGE", "FLASH_LSO")).withKey("storageMechanism") + } yield SelfDescribingData( + SchemaKey("com.snowplowanalytics.snowplow", "client_session", "jsonschema", SchemaVer.Full(1,0,1)), + asObject(List(userId, sessionId, sessionIndex, previousSessionId, storageMechanism)) + ) + + private def strGen(n: Int, gen: Gen[Char]): Gen[String] = + Gen.chooseNum(1, n).flatMap(len => Gen.listOfN(len, gen).map(_.mkString)) + + private def ipAddressGen = Gen.oneOf(ipv4AddressGen, ipv6AddressGen) + + private def ipv4AddressGen = + for { + a <- Gen.chooseNum(0, 255) + b <- Gen.chooseNum(0, 255) + c <- Gen.chooseNum(0, 255) + d <- Gen.chooseNum(0, 255) + } yield s"$a.$b.$c.$d" + + private def ipv6AddressGen = + for { + a <- Arbitrary.arbitrary[Short] + b <- Arbitrary.arbitrary[Short] + c <- Arbitrary.arbitrary[Short] + d <- Arbitrary.arbitrary[Short] + e <- Arbitrary.arbitrary[Short] + f <- Arbitrary.arbitrary[Short] + g <- Arbitrary.arbitrary[Short] + h <- Arbitrary.arbitrary[Short] + } yield f"$a%x:$b%x:$c%x:$d%x:$e%x:$f%x:$g%x:$h%x" + + private def userAgentGen: Gen[String] = + Gen.oneOf( + "Mozilla/5.0 (iPad; CPU OS 6_1_3 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10B329 Safari/8536.25", + "Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Mobile/15A372 Safari/604.1", + "Mozilla/5.0 (Linux; U; Android 2.2; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1", + "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/13.0.782.112 Safari/535.1", + "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.0; Trident/5.0)", + "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)" + ) + + // Helpers to control null/absence + + private def asObject(fields: List[Option[(String, Json)]]): Json = + JsonObject.fromIterable(fields.collect { case Some(field) => field }).asJson + + implicit class GenOps[A](gen: Gen[A]) { + def withKey[B](name: String)(implicit enc: Encoder[A]): Gen[Option[(String, Json)]] = + gen.map { a => Some((name -> a.asJson)) } + } + + implicit class GenOptOps[A](gen: Gen[Option[A]]) { + def withKeyOpt(name: String)(implicit enc: Encoder[A]): Gen[Option[(String, Json)]] = + gen.map { + case Some(a) => Some((name -> a.asJson)) + case None => None + } + + def withKeyNull(name: String)(implicit enc: Encoder[A]): Gen[Option[(String, Json)]] = + gen.map { + case Some(a) => Some((name -> a.asJson)) + case None => Some((name -> Json.Null)) + } + } + + /** Convert `Gen` into `IO` */ + def runGen[F[_]: Sync, A](gen: Gen[A]): F[A] = { + val MAX_ATTEMPTS = 5 + def go(attempt: Int): F[A] = + if (attempt >= MAX_ATTEMPTS) + Sync[F].raiseError(new RuntimeException(s"Couldn't generate an event after $MAX_ATTEMPTS attempts")) + else + Sync[F].delay(gen.sample).flatMap { + case Some(a) => Sync[F].pure(a) + case None => go(attempt + 1) + } + go(1) + } +} diff --git a/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/EnrichKafkaSpec.scala b/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/EnrichKafkaSpec.scala new file mode 100644 index 000000000..4a3e555c1 --- /dev/null +++ b/modules/kafka/src/it/scala/com/snowplowanalytics/snowplow/enrich/kafka/test/EnrichKafkaSpec.scala @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2022 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.kafka +package test + +import scala.concurrent.duration._ +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger +import cats.effect.{Blocker, IO} +import cats.effect.concurrent.Ref +import fs2.Stream +import org.specs2.mutable.Specification +import cats.effect.testing.specs2.CatsIO +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input.{Kafka => InKafka} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output.{Kafka => OutKafka} + + +class EnrichKafkaSpec extends Specification with CatsIO { + + sequential + + private implicit def logger: Logger[IO] = Slf4jLogger.getLogger[IO] + + val collectorPayloadsStream = "it-enrich-kinesis-collector-payloads" + val enrichedStream = "it-enrich-kinesis-enriched" + val badRowsStream = "it-enrich-kinesis-bad" + + val nbGood = 100l + val nbBad = 10l + + type AggregateGood = List[Event] + type AggregateBad = List[String] + case class Aggregates(good: AggregateGood, bad: AggregateBad) + + val kafkaPort = 9092 + val bootstrapServers = s"localhost:$kafkaPort" + + val consumerConf: Map[String, String] = Map( + "group.id" -> "it-enrich", + "auto.offset.reset" -> "earliest", + "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", + "value.deserializer" -> "org.apache.kafka.common.serialization.ByteArrayDeserializer" + ) + + val producerConf: Map[String, String] = Map( + "acks" -> "all" + ) + + def run(): IO[Aggregates] = { + + val resources = + for { + blocker <- Blocker[IO] + sink <- Sink.init[IO](blocker, OutKafka(collectorPayloadsStream, bootstrapServers, "", Set.empty, producerConf)) + } yield sink + + resources.use { sink => + val generate = + CollectorPayloadGen.generate[IO](nbGood, nbBad) + .evalMap(events => sink(List(events))) + .onComplete(fs2.Stream.eval(Logger[IO].info(s"Random data has been generated and sent to $collectorPayloadsStream"))) + + def consume(refGood: Ref[IO, AggregateGood], refBad: Ref[IO, AggregateBad]): Stream[IO, Unit] = + consumeGood(refGood).merge(consumeBad(refBad)) + + def consumeGood(ref: Ref[IO, AggregateGood]): Stream[IO, Unit] = + Source.init[IO](InKafka(enrichedStream, bootstrapServers, consumerConf)).map(_.record.value).evalMap(aggregateGood(_, ref)) + + def consumeBad(ref: Ref[IO, AggregateBad]): Stream[IO, Unit] = + Source.init[IO](InKafka(badRowsStream, bootstrapServers, consumerConf)).map(_.record.value).evalMap(aggregateBad(_, ref)) + + def aggregateGood(r: Array[Byte], ref: Ref[IO, AggregateGood]): IO[Unit] = + for { + e <- IO(Event.parse(new String(r)).getOrElse(throw new RuntimeException("can't parse enriched event"))) + _ <- ref.update(updateAggregateGood(_, e)) + } yield () + + def aggregateBad(r: Array[Byte], ref: Ref[IO, AggregateBad]): IO[Unit] = { + for { + br <- IO(new String(r)) + _ <- ref.update(updateAggregateBad(_, br)) + } yield () + } + + def updateAggregateGood(aggregate: AggregateGood, e: Event): AggregateGood = + e :: aggregate + + def updateAggregateBad(aggregate: AggregateBad, br: String): AggregateBad = + br :: aggregate + + for { + refGood <- Ref.of[IO, AggregateGood](Nil) + refBad <- Ref.of[IO, AggregateBad](Nil) + _ <- + generate + .merge(consume(refGood, refBad)) + .interruptAfter(30.seconds) + .attempt + .compile + .drain + aggregateGood <- refGood.get + aggregateBad <- refBad.get + } yield Aggregates(aggregateGood, aggregateBad) + } + } + + val aggregates = run().unsafeRunSync() + + "enrich-kinesis" should { + "emit the expected enriched events" in { + aggregates.good.size must beEqualTo(nbGood) + } + + "emit the expected bad rows events" in { + aggregates.bad.size must beEqualTo(nbBad) + } + } +} diff --git a/modules/kafka/src/main/resources/application.conf b/modules/kafka/src/main/resources/application.conf new file mode 100644 index 000000000..a3ce65c7f --- /dev/null +++ b/modules/kafka/src/main/resources/application.conf @@ -0,0 +1,73 @@ +{ + "input": { + "type": "Kafka" + "consumerConf": { + "enable.auto.commit": "false" + "auto.offset.reset" : "earliest" + "group.id": "enrich" + } + } + + "output": { + "good": { + "type": "Kafka" + "producerConf": { + "acks": "all" + } + "partitionKey": "" + "headers": [] + } + + "pii": { + "type": "Kafka" + "topicName": "" + "bootstrapServers": "" + "producerConf": { + "acks": "all" + } + "partitionKey": "" + "headers": [] + } + + "bad": { + "type": "Kafka" + "producerConf": { + "acks": "all" + } + "partitionKey": "" + "headers": [] + } + } + + "concurrency" : { + "enrich": 256 + "sink": 1 + } + + "remoteAdapters" : { + "connectionTimeout": 10 seconds, + "readTimeout": 45 seconds, + "maxConnections": 10, + "configs" : [] + } + + "monitoring": { + "metrics": { + "cloudwatch": true + } + } + + "telemetry": { + "disable": false + "interval": 15 minutes + "method": POST + "collectorUri": collector-g.snowplowanalytics.com + "collectorPort": 443 + "secure": true + } + + "featureFlags" : { + "acceptInvalid": false + "legacyEnrichmentOrder": false + } +} diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala new file mode 100644 index 000000000..29e0a5de0 --- /dev/null +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Main.scala @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2022 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ + +package com.snowplowanalytics.snowplow.enrich.kafka + +import java.util.concurrent.{Executors, TimeUnit} +import scala.concurrent.ExecutionContext +import cats.{Applicative, Parallel} +import cats.implicits._ +import cats.effect.{ExitCode, IO, IOApp, Resource, SyncIO} +import fs2.kafka.CommittableConsumerRecord +import com.snowplowanalytics.snowplow.enrich.common.fs2.Run +import com.snowplowanalytics.snowplow.enrich.kafka.generated.BuildInfo + +object Main extends IOApp.WithContext { + + // Kafka records must not exceed 1MB + private val MaxRecordSize = 1000000 + + /** + * An execution context matching the cats effect IOApp default. We create it explicitly so we can + * also use it for our Blaze client. + */ + override protected val executionContextResource: Resource[SyncIO, ExecutionContext] = { + val poolSize = math.max(2, Runtime.getRuntime.availableProcessors()) + Resource + .make(SyncIO(Executors.newFixedThreadPool(poolSize)))(pool => + SyncIO { + pool.shutdown() + pool.awaitTermination(10, TimeUnit.SECONDS) + () + } + ) + .map(ExecutionContext.fromExecutorService) + } + + def run(args: List[String]): IO[ExitCode] = + Run.run[IO, CommittableConsumerRecord[IO, String, Array[Byte]]]( + args, + BuildInfo.name, + BuildInfo.version, + BuildInfo.description, + executionContext, + (_, cliConfig) => IO(cliConfig), + (_, input, _) => Source.init[IO](input), + (blocker, out) => Sink.initAttributed(blocker, out), + (blocker, out) => Sink.initAttributed(blocker, out), + (blocker, out) => Sink.init(blocker, out), + checkpoint, + List.empty, + _.record.value, + MaxRecordSize, + None, + None + ) + + private def checkpoint[F[_]: Applicative: Parallel](records: List[CommittableConsumerRecord[F, String, Array[Byte]]]): F[Unit] = + if (records.isEmpty) Applicative[F].unit + else + records + .groupBy(_.record.partition) + .mapValues(_.maxBy(_.record.offset)) + .values + .toList + .parTraverse_(_.offset.commit) +} diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala new file mode 100644 index 000000000..c228ee201 --- /dev/null +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Sink.scala @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2022 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ + +package com.snowplowanalytics.snowplow.enrich.kafka + +import java.util.UUID + +import cats.Parallel +import cats.implicits._ +import cats.effect.{Blocker, Concurrent, ConcurrentEffect, ContextShift, Resource, Timer} + +import fs2.kafka._ + +import com.snowplowanalytics.snowplow.enrich.common.fs2.{AttributedByteSink, AttributedData, ByteSink} +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Output + +object Sink { + + def init[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer]( + blocker: Blocker, + output: Output + ): Resource[F, ByteSink[F]] = + for { + sink <- initAttributed(blocker, output) + } yield (records: List[Array[Byte]]) => sink(records.map(AttributedData(_, UUID.randomUUID().toString, Map.empty))) + + def initAttributed[F[_]: ConcurrentEffect: ContextShift: Parallel: Timer]( + blocker: Blocker, + output: Output + ): Resource[F, AttributedByteSink[F]] = + output match { + case k: Output.Kafka => + mkProducer(blocker, k).map { producer => records => + records.parTraverse_ { record => + producer + .produceOne_(toProducerRecord(k.topicName, record)) + .flatten + .void + } + } + case o => Resource.eval(Concurrent[F].raiseError(new IllegalArgumentException(s"Output $o is not Kafka"))) + } + + private def mkProducer[F[_]: ConcurrentEffect: ContextShift]( + blocker: Blocker, + output: Output.Kafka + ): Resource[F, KafkaProducer[F, String, Array[Byte]]] = { + val producerSettings = + ProducerSettings[F, String, Array[Byte]] + .withBootstrapServers(output.bootstrapServers) + .withProperties(output.producerConf) + .withBlocker(blocker) + .withProperties( + ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"), + ("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") + ) + + KafkaProducer[F].resource(producerSettings) + } + + private def toProducerRecord(topicName: String, record: AttributedData[Array[Byte]]): ProducerRecord[String, Array[Byte]] = + ProducerRecord(topicName, record.partitionKey, record.data) + .withHeaders(Headers.fromIterable(record.attributes.map(t => Header(t._1, t._2)))) +} diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Source.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Source.scala new file mode 100644 index 000000000..eaf9c1353 --- /dev/null +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.kafka/Source.scala @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2022 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ + +package com.snowplowanalytics.snowplow.enrich.kafka + +import cats.effect.{ConcurrentEffect, ContextShift, Timer} + +import fs2.kafka.{CommittableConsumerRecord, ConsumerSettings, KafkaConsumer} +import fs2.Stream + +import com.snowplowanalytics.snowplow.enrich.common.fs2.config.io.Input + +object Source { + + def init[F[_]: ConcurrentEffect: ContextShift: Timer]( + input: Input + ): Stream[F, CommittableConsumerRecord[F, String, Array[Byte]]] = + input match { + case k: Input.Kafka => kafka(k) + case i => Stream.raiseError[F](new IllegalArgumentException(s"Input $i is not Kafka")) + } + + def kafka[F[_]: ConcurrentEffect: ContextShift: Timer]( + input: Input.Kafka + ): Stream[F, CommittableConsumerRecord[F, String, Array[Byte]]] = { + val consumerSettings = + ConsumerSettings[F, String, Array[Byte]] + .withBootstrapServers(input.bootstrapServers) + .withProperties(input.consumerConf) + .withEnableAutoCommit(false) // prevent enabling auto-commits by setting this after user-provided config + .withProperties( + ("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"), + ("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") + ) + + KafkaConsumer[F] + .stream(consumerSettings) + .subscribeTo(input.topicName) + .records + } +} diff --git a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Sink.scala b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Sink.scala index 488ae085d..82a5c81d4 100644 --- a/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Sink.scala +++ b/modules/kinesis/src/main/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Sink.scala @@ -49,7 +49,7 @@ object Sink { ): Resource[F, ByteSink[F]] = for { sink <- initAttributed(blocker, output) - } yield records => sink(records.map(AttributedData(_, Map.empty))) + } yield (records: List[Array[Byte]]) => sink(records.map(AttributedData(_, UUID.randomUUID().toString, Map.empty))) def initAttributed[F[_]: Concurrent: ContextShift: Parallel: Timer]( blocker: Blocker, @@ -210,14 +210,9 @@ object Sink { private def toKinesisRecords(records: List[AttributedData[Array[Byte]]]): List[PutRecordsRequestEntry] = records.map { r => - val partitionKey = - r.attributes.toList match { // there can be only one attribute : the partition key - case head :: Nil => head._2 - case _ => UUID.randomUUID().toString - } val data = ByteBuffer.wrap(r.data) val prre = new PutRecordsRequestEntry() - prre.setPartitionKey(partitionKey) + prre.setPartitionKey(r.partitionKey) prre.setData(data) prre } diff --git a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala index 0ccd28b12..39f2dc1c3 100644 --- a/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala +++ b/modules/pubsub/src/main/scala/com/snowplowanalytics/snowplow/enrich/pubsub/Sink.scala @@ -38,7 +38,7 @@ object Sink { ): Resource[F, ByteSink[F]] = for { sink <- initAttributed(output) - } yield records => sink(records.map(AttributedData(_, Map.empty))) + } yield (records: List[Array[Byte]]) => sink(records.map(AttributedData(_, "", Map.empty))) def initAttributed[F[_]: Concurrent: ContextShift: Parallel: Timer]( output: Output diff --git a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala index 1d7d2b8cd..d6bf282b4 100644 --- a/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala +++ b/modules/rabbitmq/src/main/scala/com/snowplowanalytics/snowplow/enrich/rabbitmq/Sink.scala @@ -40,7 +40,7 @@ object Sink { ): Resource[F, ByteSink[F]] = for { sink <- initAttributed(blocker, output) - } yield records => sink(records.map(AttributedData(_, Map.empty))) + } yield records => sink(records.map(AttributedData(_, "", Map.empty))) def initAttributed[F[_]: ConcurrentEffect: ContextShift: Parallel: Sync: Timer]( blocker: Blocker, diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index b52e1aaa3..9edcdada4 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -86,7 +86,7 @@ object BuildSettings { lazy val pubsubProjectSettings = projectSettings ++ Seq( name := "snowplow-enrich-pubsub", moduleName := "snowplow-enrich-pubsub", - description := "High-performance streaming enrich app with Pub/Sub source, built on top of functional streams", + description := "High-performance streaming enrich app working with Pub/Sub, built on top of functional streams", buildInfoKeys := Seq[BuildInfoKey](organization, name, version, description), buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.pubsub.generated" ) @@ -94,7 +94,7 @@ object BuildSettings { lazy val kinesisProjectSettings = projectSettings ++ Seq( name := "snowplow-enrich-kinesis", moduleName := "snowplow-enrich-kinesis", - description := "High-performance streaming enrich app with Kinesis source, built on top of functional streams", + description := "High-performance streaming enrich app working with Kinesis, built on top of functional streams", buildInfoKeys := Seq[BuildInfoKey](organization, name, version, description), buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.kinesis.generated" ) @@ -107,6 +107,14 @@ object BuildSettings { buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.rabbitmq.generated" ) + lazy val kafkaProjectSettings = projectSettings ++ Seq( + name := "snowplow-enrich-kafka", + moduleName := "snowplow-enrich-kafka", + description := "High-performance streaming enrich app working with Kafka, built on top of functional streams", + buildInfoKeys := Seq[BuildInfoKey](organization, name, version, description), + buildInfoPackage := "com.snowplowanalytics.snowplow.enrich.kafka.generated" + ) + /** Make package (build) metadata available within source code. */ lazy val scalifiedSettings = Seq( Compile / sourceGenerators += Def.task { @@ -329,6 +337,18 @@ object BuildSettings { lazy val rabbitmqDistrolessBuildSettings = rabbitmqBuildSettings.diff(dockerSettingsFocal) ++ dockerSettingsDistroless + lazy val kafkaBuildSettings = { + // Project + kafkaProjectSettings ++ buildSettings ++ + // Build and publish + assemblySettings ++ dockerSettingsFocal ++ + Seq(Docker / packageName := "snowplow-enrich-kafka") ++ + // Tests + scoverageSettings ++ noParallelTestExecution + } + + lazy val kafkaDistrolessBuildSettings = kafkaBuildSettings.diff(dockerSettingsFocal) ++ dockerSettingsDistroless + /** Fork a JVM per test in order to not reuse enrichment registries. */ def oneJVMPerTest(tests: Seq[TestDefinition]): Seq[Tests.Group] = tests.map(t => Tests.Group(t.name, Seq(t), Tests.SubProcess(ForkOptions()))) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d3d024d77..1308acaaa 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -70,7 +70,7 @@ object Dependencies { val gcpSdk = "2.7.2" val kinesisClient = "1.14.5" val awsSdk2 = "2.17.287" - val kafka = "2.8.1" + val kafka = "2.8.2" val mskAuth = "1.1.4" val nsqClient = "1.3.0" val jackson = "2.13.3" @@ -81,6 +81,7 @@ object Dependencies { val catsEffect = "2.5.0" val fs2PubSub = "0.18.1" val fs2Aws = "3.1.1" + val fs2Kafka = "1.10.0" val fs2BlobStorage = "0.8.6" val http4s = "0.21.33" val log4cats = "1.3.0" @@ -191,6 +192,7 @@ object Dependencies { val fs2Aws = "io.laserdisc" %% "fs2-aws" % V.fs2Aws val fs2 = "co.fs2" %% "fs2-core" % V.fs2 val fs2Io = "co.fs2" %% "fs2-io" % V.fs2 + val fs2Kafka = "com.github.fd4s" %% "fs2-kafka" % V.fs2Kafka val kinesisSdk2 = "software.amazon.awssdk" % "kinesis" % V.awsSdk2 val dynamoDbSdk2 = "software.amazon.awssdk" % "dynamodb" % V.awsSdk2 val s3Sdk2 = "software.amazon.awssdk" % "s3" % V.awsSdk2 @@ -344,6 +346,11 @@ object Dependencies { fs2RabbitMQ ) + val kafkaDependencies = Seq( + fs2Kafka, + kafkaClients // override kafka-clients 2.8.1 from fs2Kafka to address https://security.snyk.io/vuln/SNYK-JAVA-ORGAPACHEKAFKA-3027430 + ) + // exclusions val exclusions = Seq( "org.apache.tomcat.embed" % "tomcat-embed-core"