Skip to content

Commit

Permalink
Add RabbitMQ asset (close #679)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Sep 27, 2022
1 parent 2c0e3a7 commit 3cc6ba6
Show file tree
Hide file tree
Showing 15 changed files with 922 additions and 27 deletions.
11 changes: 9 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ jobs:
"project streamKafka; set assembly / test := {}; assembly" \
"project streamNsq; set assembly / test := {}; assembly" \
"project pubsub; set assembly / test := {}; assembly" \
"project kinesis; set assembly / test := {}; assembly"
"project kinesis; set assembly / test := {}; assembly" \
"project rabbitmq; set assembly / test := {}; assembly"
- name: Create GitHub release and attach artifacts
uses: softprops/action-gh-release@v1
with:
Expand All @@ -98,6 +99,7 @@ jobs:
modules/stream/nsq/target/scala-2.12/snowplow-stream-enrich-nsq-${{ steps.ver.outputs.tag }}.jar
modules/pubsub/target/scala-2.12/snowplow-enrich-pubsub-${{ steps.ver.outputs.tag }}.jar
modules/kinesis/target/scala-2.12/snowplow-enrich-kinesis-${{ steps.ver.outputs.tag }}.jar
modules/rabbitmq/target/scala-2.12/snowplow-enrich-kinesis-${{ steps.ver.outputs.tag }}.jar
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

Expand All @@ -113,6 +115,11 @@ jobs:
- streamNsq
- pubsub
- kinesis
- rabbitmq
include:
- suffix: ""
- suffix: -experimental
app: rabbitmq
steps:
- uses: actions/checkout@v2
if: startsWith(github.ref, 'refs/tags/')
Expand Down Expand Up @@ -144,7 +151,7 @@ jobs:
- name: Get app package name
id: packageName
run: |
export PACKAGE_NAME=$(sbt "project ${{ matrix.app }}" dockerAlias -Dsbt.log.noformat=true | sed -n '/\[info\]/ s/\[info\] //p' | tail -1 | tr -d '\n' | cut -d":" -f1)
export PACKAGE_NAME=$(sbt "project ${{ matrix.app }}" dockerAlias -Dsbt.log.noformat=true | sed -n '/\[info\]/ s/\[info\] //p' | tail -1 | tr -d '\n' | cut -d":" -f1)${{ matrix.suffix }}
echo "::set-output name=package_name::$PACKAGE_NAME"
- name: Get app base directory
id: baseDirectory
Expand Down
21 changes: 20 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ lazy val root = project.in(file("."))
.settings(projectSettings)
.settings(compilerSettings)
.settings(resolverSettings)
.aggregate(common, commonFs2, pubsub, pubsubDistroless, kinesis, kinesisDistroless, streamCommon, streamKinesis, streamKinesisDistroless, streamKafka, streamKafkaDistroless, streamNsq, streamNsqDistroless, streamStdin)
.aggregate(common, commonFs2, pubsub, pubsubDistroless, kinesis, kinesisDistroless, streamCommon, streamKinesis, streamKinesisDistroless, streamKafka, streamKafkaDistroless, streamNsq, streamNsqDistroless, streamStdin, rabbitmq, rabbitmqDistroless)

lazy val common = project
.in(file("modules/common"))
Expand Down Expand Up @@ -155,3 +155,22 @@ lazy val bench = project
.in(file("modules/bench"))
.dependsOn(pubsub % "test->test")
.enablePlugins(JmhPlugin)

lazy val rabbitmq = project
.in(file("modules/rabbitmq"))
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin)
.settings(rabbitmqBuildSettings)
.settings(libraryDependencies ++= rabbitmqDependencies)
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2)

lazy val rabbitmqDistroless = project
.in(file("modules/distroless/rabbitmq"))
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin, LauncherJarPlugin)
.settings(sourceDirectory := (rabbitmq / sourceDirectory).value)
.settings(rabbitmqDistrolessBuildSettings)
.settings(libraryDependencies ++= rabbitmqDependencies)
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
.dependsOn(commonFs2)
281 changes: 281 additions & 0 deletions config/config.rabbitmq.extended.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
{
# Where to read collector payloads from
"input": {
"type": "RabbitMQ"

"cluster": {
# Nodes of RabbitMQ cluster
"nodes": [
{
"host": "localhost"
"port": 5672
}
]
# Username to connect to the cluster
"username": "guest"
# Password to connect to the cluster
"password": "guest"
# Virtual host to use when connecting to the cluster
"virtualHost": "/"

# Optional. Whether to use SSL or not to communicate with the cluster
"ssl": false
# Optional. Timeout for the connection to the cluster (in seconds)
"connectionTimeout": 5
# Optional. Size of the fs2’s bounded queue used internally to communicate with the AMQP Java driver
"internalQueueSize": 1000
# Optional. Whether the AMQP Java driver should try to recover broken connections
"automaticRecovery": true
# Optional. Interval to check that the TCP connection to the cluster is still alive
"requestedHeartbeat": 100
}

# Queue to read collector payloads from
"queue": "raw"

# Optional. Settings for backoff policy for checkpointing.
# Records are checkpointed after all the records of the same chunk have been enriched
"checkpointBackoff": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}
}

"output": {
# Enriched events output
"good": {
"type": "RabbitMQ"

"cluster": {
# Nodes of RabbitMQ cluster
"nodes": [
{
"host": "localhost"
"port": 5672
}
]
# Username to connect to the cluster
"username": "guest"
# Password to connect to the cluster
"password": "guest"
# Virtual host to use when connecting to the cluster
"virtualHost": "/"

# Optional. Whether to use SSL or not to communicate with the cluster
"ssl": false
# Optional. Timeout for the connection to the cluster (in seconds)
"connectionTimeout": 5
# Optional. Size of the fs2’s bounded queue used internally to communicate with the AMQP Java driver
"internalQueueSize": 1000
# Optional. Whether the AMQP Java driver should try to recover broken connections
"automaticRecovery": true
# Optional. Interval to check that the TCP connection to the cluster is still alive
"requestedHeartbeat": 100
}

# Exchange to send the enriched events to
"exchange": "enriched"
# Routing key to use when sending the enriched events to the exchange
"routingKey": "enriched"

# Optional. Policy to retry if writing to RabbitMQ fails
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}
}

# Bad rows output
"bad": {
"type": "RabbitMQ"

"cluster": {
# Nodes of RabbitMQ cluster
"nodes": [
{
"host": "localhost"
"port": 5672
}
]
# Username to connect to the cluster
"username": "guest"
# Password to connect to the cluster
"password": "guest"
# Virtual host to use when connecting to the cluster
"virtualHost": "/"

# Optional. Whether to use SSL or not to communicate with the cluster
"ssl": false
# Optional. Timeout for the connection to the cluster (in seconds)
"connectionTimeout": 5
# Optional. Size of the fs2’s bounded queue used internally to communicate with the AMQP Java driver
"internalQueueSize": 1000
# Optional. Whether the AMQP Java driver should try to recover broken connections
"automaticRecovery": true
# Optional. Interval to check that the TCP connection to the cluster is still alive
"requestedHeartbeat": 100
}

# Exchange to send the bad rows to
"exchange": "bad-1"
# Routing key to use when sending the bad rows to the exchange
"routingKey": "bad-1"

# Optional. Policy to retry if writing to RabbitMQ fails
"backoffPolicy": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}
}
}

# Optional. Concurrency of the app
"concurrency" : {
# Number of events that can get enriched at the same time within a chunk
"enrich": 256
# Number of chunks that can get sunk at the same time
"sink": 3
}

# Optional, period after which enrich assets should be checked for updates
# no assets will be updated if the key is absent
"assetsUpdatePeriod": "7 days"

# Optional, configuration of remote adapters
"remoteAdapters": {
# how long enrich waits to establish a connection to remote adapters
"connectionTimeout": "10 seconds"
# how long enrich waits to get a response from remote adapters
"readTimeout": "45 seconds"
# how many connections enrich opens at maximum for remote adapters
# increasing this could help with throughput in case of adapters with high latency
"maxConnections": 10
# a list of remote adapter configs
"configs": [
{
"vendor": "com.example"
"version": "v1"
"url": "https://remote-adapter.com"
}
]
}

"monitoring": {

# Optional, for tracking runtime exceptions
"sentry": {
"dsn": "http://sentry.acme.com"
}

# Optional, configure how metrics are reported
"metrics": {

# Optional. Send metrics to a StatsD server on localhost
"statsd": {
"hostname": "localhost"
"port": 8125

# Required, how frequently to report metrics
"period": "10 seconds"

# Any key-value pairs to be tagged on every StatsD metric
"tags": {
"app": enrich
}

# Optional, override the default metric prefix
# "prefix": "snowplow.enrich."
}

# Optional. Log to stdout using Slf4j
"stdout": {
"period": "10 seconds"

# Optional, override the default metric prefix
# "prefix": "snowplow.enrich."
}

# Optional. Send KCL and KPL metrics to Cloudwatch
"cloudwatch": true
}
}

# Optional, configure telemetry
# All the fields are optional
"telemetry": {

# Set to true to disable telemetry
"disable": false

# Interval for the heartbeat event
"interval": 15 minutes

# HTTP method used to send the heartbeat event
"method": POST

# URI of the collector receiving the heartbeat event
"collectorUri": collector-g.snowplowanalytics.com

# Port of the collector receiving the heartbeat event
"collectorPort": 443

# Whether to use https or not
"secure": true

# Identifier intended to tie events together across modules,
# infrastructure and apps when used consistently
"userProvidedId": my_pipeline

# ID automatically generated upon running a modules deployment script
# Intended to identify each independent module, and the infrastructure it controls
"autoGeneratedId": hfy67e5ydhtrd

# Unique identifier for the VM instance
# Unique for each instance of the app running within a module
"instanceId": 665bhft5u6udjf

# Name of the terraform module that deployed the app
"moduleName": enrich-rabbitmq-ce

# Version of the terraform module that deployed the app
"moduleVersion": 1.0.0
}

# Optional. To activate/deactive enrich features that are still in beta
# or that are here for transition.
# This section might change in future versions
"featureFlags" : {

# Enrich 3.0.0 introduces the validation of the enriched events against atomic schema
# before emitting.
# If set to false, a bad row will be emitted instead of the enriched event
# if validation fails.
# If set to true, invalid enriched events will be emitted, as before.
# WARNING: this feature flag will be removed in a future version
# and it will become impossible to emit invalid enriched events.
# More details: https://github.com/snowplow/enrich/issues/517#issuecomment-1033910690
"acceptInvalid": false

# In early versions of enrich-kinesis and enrich-pubsub (pre-3.1.4), the Javascript enrichment
# incorrectly ran before the currency, weather, and IP Lookups enrichments. Set this flag to true
# to keep the erroneous behaviour of those previous versions. This flag will be removed in a
# future version.
# More details: https://github.com/snowplow/enrich/issues/619
"legacyEnrichmentOrder": false
}

# Optional. Configuration for experimental/preview features
"experimental": {
# Whether to export metadata using a webhook URL.
# Follows iglu-webhook protocol.
"metadata": {
"endpoint": "https://my_pipeline.my_domain.com/iglu"
"interval": 5 minutes
"organizationId": "c5f3a09f-75f8-4309-bec5-fea560f78455"
"pipelineId": "75a13583-5c99-40e3-81fc-541084dfc784"
}
}
}
Loading

0 comments on commit 3cc6ba6

Please sign in to comment.