Skip to content

Commit

Permalink
Add enrich-kafka (close #648)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Oct 17, 2022
1 parent 6baf6f1 commit 2657331
Show file tree
Hide file tree
Showing 29 changed files with 1,238 additions and 42 deletions.
13 changes: 11 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ jobs:
run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" sbt coverage +test
env:
OER_KEY: ${{ secrets.OER_KEY }}
- name: Run integration tests
id: integrationTest
run: |
sbt "project kafka" "docker:publishLocal"
docker-compose -f integration-tests/enrich-kafka/docker-compose.yml up -d
sbt "project kafka" "it:test"
docker-compose -f integration-tests/enrich-kafka/docker-compose.yml down
- name: Check Scala formatting
if: ${{ always() }}
run: sbt scalafmtCheckAll
Expand Down Expand Up @@ -85,7 +92,8 @@ jobs:
"project streamNsq; set assembly / test := {}; assembly" \
"project pubsub; set assembly / test := {}; assembly" \
"project kinesis; set assembly / test := {}; assembly" \
"project rabbitmq; set assembly / test := {}; assembly"
"project rabbitmq; set assembly / test := {}; assembly" \
"project kafka; set assembly / test := {}; assembly"
- name: Create GitHub release and attach artifacts
uses: softprops/action-gh-release@v1
with:
Expand All @@ -100,6 +108,7 @@ jobs:
modules/pubsub/target/scala-2.12/snowplow-enrich-pubsub-${{ steps.ver.outputs.tag }}.jar
modules/kinesis/target/scala-2.12/snowplow-enrich-kinesis-${{ steps.ver.outputs.tag }}.jar
modules/rabbitmq/target/scala-2.12/snowplow-enrich-rabbitmq-${{ steps.ver.outputs.tag }}.jar
modules/kafka/target/scala-2.12/snowplow-enrich-kafka-${{ steps.ver.outputs.tag }}.jar
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

Expand All @@ -115,6 +124,7 @@ jobs:
- streamNsq
- pubsub
- kinesis
- kafka
- rabbitmq
include:
- suffix: ""
Expand Down Expand Up @@ -231,4 +241,3 @@ jobs:
PGP_SECRET: ${{ secrets.SONA_PGP_SECRET }}
SONATYPE_USERNAME: ${{ secrets.SONA_USER }}
SONATYPE_PASSWORD: ${{ secrets.SONA_PASS }}

28 changes: 28 additions & 0 deletions .github/workflows/lacework.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,31 @@ jobs:
LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }}
LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }}
run: ./lw-scanner image evaluate snowplow/stream-enrich-nsq ${{ steps.ver.outputs.tag }}-distroless --build-id ${{ github.run_id }} --no-pull

- name: Scan enrich-kafka
env:
LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }}
LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }}
LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }}
run: ./lw-scanner image evaluate snowplow/snowplow-enrich-kafka ${{ steps.ver.outputs.tag }} --build-id ${{ github.run_id }} --no-pull

- name: Scan enrich-kafka distroless
env:
LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }}
LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }}
LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }}
run: ./lw-scanner image evaluate snowplow/snowplow-enrich-kafka ${{ steps.ver.outputs.tag }}-distroless --build-id ${{ github.run_id }} --no-pull

- name: Scan enrich-rabbitmq
env:
LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }}
LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }}
LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }}
run: ./lw-scanner image evaluate snowplow/snowplow-enrich-rabbitmq ${{ steps.ver.outputs.tag }} --build-id ${{ github.run_id }} --no-pull

- name: Scan enrich-rabbitmq distroless
env:
LW_ACCESS_TOKEN: ${{ secrets.LW_ACCESS_TOKEN }}
LW_ACCOUNT_NAME: ${{ secrets.LW_ACCOUNT_NAME }}
LW_SCANNER_SAVE_RESULTS: ${{ !contains(steps.version.outputs.tag, 'rc') }}
run: ./lw-scanner image evaluate snowplow/snowplow-enrich-rabbitmq ${{ steps.ver.outputs.tag }}-distroless --build-id ${{ github.run_id }} --no-pull
27 changes: 26 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ lazy val root = project.in(file("."))
.settings(projectSettings)
.settings(compilerSettings)
.settings(resolverSettings)
.aggregate(common, commonFs2, pubsub, pubsubDistroless, kinesis, kinesisDistroless, streamCommon, streamKinesis, streamKinesisDistroless, streamKafka, streamKafkaDistroless, streamNsq, streamNsqDistroless, streamStdin, rabbitmq, rabbitmqDistroless)
.aggregate(common, commonFs2, pubsub, pubsubDistroless, kinesis, kinesisDistroless, streamCommon, streamKinesis, streamKinesisDistroless, streamKafka, streamKafkaDistroless, streamNsq, streamNsqDistroless, streamStdin, kafka, kafkaDistroless, rabbitmq, rabbitmqDistroless)

lazy val common = project
.in(file("modules/common"))
Expand Down Expand Up @@ -151,6 +151,31 @@ lazy val kinesisDistroless = project
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2)

lazy val kafka = project
.in(file("modules/kafka"))
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin)
.settings(kafkaBuildSettings)
.settings(libraryDependencies ++= kafkaDependencies ++ Seq(
// integration test dependencies
specs2CEIt,
specs2ScalacheckIt
))
.settings(excludeDependencies ++= exclusions)
.settings(Defaults.itSettings)
.configs(IntegrationTest)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2)

lazy val kafkaDistroless = project
.in(file("modules/distroless/kafka"))
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin, LauncherJarPlugin)
.settings(sourceDirectory := (kafka / sourceDirectory).value)
.settings(kafkaDistrolessBuildSettings)
.settings(libraryDependencies ++= kafkaDependencies)
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2)

lazy val bench = project
.in(file("modules/bench"))
.dependsOn(pubsub % "test->test")
Expand Down
236 changes: 236 additions & 0 deletions config/config.kafka.extended.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
{
# Where to read collector payloads from
"input": {
"type": "Kafka"

# Name of the Kafka topic to read from
"topicName": "collector-payloads"

# A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
# This list should be in the form host1:port1,host2:port2,...
"bootstrapServers": "localhost:9092"

# Optional, Kafka Consumer configuration
# See https://kafka.apache.org/documentation/#consumerconfigs for all properties
"consumerConf": {
"auto.offset.reset" : "earliest"
"session.timeout.ms": "45000"
}
}

"output": {
# Enriched events output
"good": {
"type": "Kafka"

# Name of the Kafka topic to write to
"topicName": "enriched"

# A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
# This list should be in the form host1:port1,host2:port2,...
"bootstrapServers": "localhost:9092"

# Optional, Kafka producer configuration
# See https://kafka.apache.org/documentation/#producerconfigs for all properties
"producerConf": {
"acks": "all"
}

# Optional. Enriched event field to use as Kafka partition key
"partitionKey": "app_id"

# Optional. Enriched event fields to add as Kafka record headers
"headers": [ "app_id" ]
}

# Optional. Pii events output. Should be omitted if pii events are not emitted
"pii": {
"type": "Kafka"

# Name of the Kafka topic to write to
"topicName": "pii"

# A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
# This list should be in the form host1:port1,host2:port2,...
"bootstrapServers": "localhost:9092"

# Optional, Kafka producer configuration
# See https://kafka.apache.org/documentation/#producerconfigs for all properties
"producerConf": {
"acks": "all"
}

# Optional. Enriched event field to use as Kafka partition key
"partitionKey": "app_id"

# Optional. Enriched event fields to add as Kafka record headers
"headers": [ "app_id" ]
}

# Bad rows output
"bad": {
"type": "Kafka"

# Name of the Kafka topic to write to
"topicName": "bad"

# A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
# This list should be in the form host1:port1,host2:port2,...
"bootstrapServers": "localhost:9092"

# Optional, Kafka producer configuration
# See https://kafka.apache.org/documentation/#producerconfigs for all properties
"producerConf": {
"acks": "all"
}
}
}

# Optional. Concurrency of the app
"concurrency" : {
# Number of events that can get enriched at the same time within a chunk
"enrich": 256
# Number of chunks that can get sunk at the same time
# WARNING: if greater than 1, records can get checkpointed before they are sunk
"sink": 1
}

# Optional, period after which enrich assets should be checked for updates
# no assets will be updated if the key is absent
"assetsUpdatePeriod": "7 days"

# Optional, configuration of remote adapters
"remoteAdapters": {
# how long enrich waits to establish a connection to remote adapters
"connectionTimeout": "10 seconds",
# how long enrich waits to get a response from remote adapters
"readTimeout": "45 seconds",
# how many connections enrich opens at maximum for remote adapters
# increasing this could help with throughput in case of adapters with high latency
"maxConnections": 10,
# a list of remote adapter configs
"configs": [
{
"vendor": "com.example",
"version": "v1",
"url": "https://remote-adapter.com"
}
]
}

"monitoring": {

# Optional, for tracking runtime exceptions
"sentry": {
"dsn": "http://sentry.acme.com"
}

# Optional, configure how metrics are reported
"metrics": {

# Optional. Send metrics to a StatsD server on localhost
"statsd": {
"hostname": "localhost"
"port": 8125

# Required, how frequently to report metrics
"period": "10 seconds"

# Any key-value pairs to be tagged on every StatsD metric
"tags": {
"app": enrich
}

# Optional, override the default metric prefix
# "prefix": "snowplow.enrich."
}

# Optional. Log to stdout using Slf4j
"stdout": {
"period": "10 seconds"

# Optional, override the default metric prefix
# "prefix": "snowplow.enrich."
}

# Optional. Send KCL and KPL metrics to Cloudwatch
"cloudwatch": true
}
}

# Optional, configure telemetry
# All the fields are optional
"telemetry": {

# Set to true to disable telemetry
"disable": false

# Interval for the heartbeat event
"interval": 15 minutes

# HTTP method used to send the heartbeat event
"method": POST

# URI of the collector receiving the heartbeat event
"collectorUri": collector-g.snowplowanalytics.com

# Port of the collector receiving the heartbeat event
"collectorPort": 443

# Whether to use https or not
"secure": true

# Identifier intended to tie events together across modules,
# infrastructure and apps when used consistently
"userProvidedId": my_pipeline

# ID automatically generated upon running a modules deployment script
# Intended to identify each independent module, and the infrastructure it controls
"autoGeneratedId": hfy67e5ydhtrd

# Unique identifier for the VM instance
# Unique for each instance of the app running within a module
"instanceId": 665bhft5u6udjf

# Name of the terraform module that deployed the app
"moduleName": enrich-kafka-ce

# Version of the terraform module that deployed the app
"moduleVersion": 1.0.0
}

# Optional. To activate/deactive enrich features that are still in beta
# or that are here for transition.
# This section might change in future versions
"featureFlags" : {

# Enrich 3.0.0 introduces the validation of the enriched events against atomic schema
# before emitting.
# If set to false, a bad row will be emitted instead of the enriched event
# if validation fails.
# If set to true, invalid enriched events will be emitted, as before.
# WARNING: this feature flag will be removed in a future version
# and it will become impossible to emit invalid enriched events.
# More details: https://github.com/snowplow/enrich/issues/517#issuecomment-1033910690
"acceptInvalid": false

# In early versions of enrich-kinesis and enrich-pubsub (pre-3.1.4), the Javascript enrichment
# incorrectly ran before the currency, weather, and IP Lookups enrichments. Set this flag to true
# to keep the erroneous behaviour of those previous versions. This flag will be removed in a
# future version.
# More details: https://github.com/snowplow/enrich/issues/619
"legacyEnrichmentOrder": false
}

# Optional. Configuration for experimental/preview features
"experimental": {
# Whether to export metadata using a webhook URL.
# Follows iglu-webhook protocol.
"metadata": {
"endpoint": "https://my_pipeline.my_domain.com/iglu"
"interval": 5 minutes
"organizationId": "c5f3a09f-75f8-4309-bec5-fea560f78455"
"pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784"
}
}
}
18 changes: 18 additions & 0 deletions config/config.kafka.minimal.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"input": {
"topicName": "collector-payloads"
"bootstrapServers": "localhost:9092"
}

"output": {
"good": {
"topicName": "enriched"
"bootstrapServers": "localhost:9092"
}

"bad": {
"topicName": "bad"
"bootstrapServers": "localhost:9092"
}
}
}
Loading

0 comments on commit 2657331

Please sign in to comment.