Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add separate good/bad sink configurations #360

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 56 additions & 26 deletions examples/config.kafka.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -163,28 +163,17 @@ collector {
}

streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = "good"

# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters
bad = "bad"

# Whether to use the incoming event's ip as the partition key for the good stream/topic
# Note: Nsq does not make use of partition key.
useIpAddressAsPartitionKey = false

# Enable the chosen sink by uncommenting the appropriate configuration
sink {
# Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout.
# To use stdout, comment or remove everything in the "collector.streams.sink" section except
# "enabled" which should be set to "stdout".
enabled = kafka

# Or Kafka
# Events which have successfully been collected will be stored in the good stream/topic
good {
peel marked this conversation as resolved.
Show resolved Hide resolved

name = "good"
brokers = "localhost:9092,another.host:9092"

## Number of retries to perform before giving up on sending a record
retries = 10
# The kafka producer has a variety of possible configuration options defined at
Expand All @@ -193,6 +182,7 @@ collector {
# "bootstrap.servers" = brokers
# "buffer.memory" = buffer.byteLimit
# "linger.ms" = buffer.timeLimit

#producerConf {
# acks = all
# "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
Expand All @@ -203,18 +193,58 @@ collector {
# If a record is bigger, a size violation bad row is emitted instead
# Default: 1 MB
maxBytes = 1000000

# Incoming events are stored in a buffer before being sent to Kafka.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters
bad {

name = "bad"
brokers = "localhost:9092,another.host:9092"

## Number of retries to perform before giving up on sending a record
retries = 10
# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" = brokers
# "buffer.memory" = buffer.byteLimit
# "linger.ms" = buffer.timeLimit

#producerConf {
# acks = all
# "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
# "value.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
#}

# Optional. Maximum number of bytes that a single record can contain.
# If a record is bigger, a size violation bad row is emitted instead
# Default: 1 MB
maxBytes = 1000000

# Incoming events are stored in a buffer before being sent to Kafka.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}
}

Expand Down
14 changes: 8 additions & 6 deletions examples/config.kafka.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ collector {
port = 8080

streams {
good = "good"
bad = "bad"

sink {
brokers = "localhost:9092,another.host:9092"
good {
name = "good"
brokers = "localhost:9092,another.host:9092"
}
bad {
name = "bad"
brokers = "localhost:9092,another.host:9092"
}
}
}
}
122 changes: 95 additions & 27 deletions examples/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -163,26 +163,19 @@ collector {
}

streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = "good"

# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters

bad = "bad"

# Whether to use the incoming event's ip as the partition key for the good stream/topic
# Note: Nsq does not make use of partition key.
useIpAddressAsPartitionKey = false

# Enable the chosen sink by uncommenting the appropriate configuration
sink {
# Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout.
# To use stdout, comment or remove everything in the "collector.streams.sink" section except
# "enabled" which should be set to "stdout".
enabled = kinesis
good {

# Events which have successfully been collected will be stored in the good stream/topic
name = "good"

# Region where the streams are located
region = "eu-central-1"

Expand All @@ -193,15 +186,13 @@ collector {
# Thread pool size for Kinesis and SQS API requests
threadPoolSize = 10

# Optional SQS buffer for good and bad events (respectively).
# Optional SQS buffer for good events.
# When messages can't be sent to Kinesis, they will be sent to SQS.
# If not configured, sending to Kinesis will be retried.
# This should only be set up for the Kinesis sink, where it acts as a failsafe.
# For the SQS sink, the good and bad queue should be specified under streams.good and streams.bad, respectively and these settings should be ignored.
#sqsGoodBuffer = {{sqsGoodBuffer}}

#sqsBadBuffer = {{sqsBadBuffer}}

# Optional. Maximum number of bytes that a single record can contain.
# If a record is bigger, a size violation bad row is emitted instead
# Default: 192 kb
Expand Down Expand Up @@ -242,19 +233,96 @@ collector {
# This is the interval for the calls.
# /sink-health is made healthy as soon as requests are successful or records are successfully inserted.
startupCheckInterval = 1 second

# Incoming events are stored in a buffer before being sent to Kinesis.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}

# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters
bad {

name = "bad"

# Region where the streams are located
region = "eu-central-1"

## Optional endpoint url configuration to override aws kinesis endpoints,
## this can be used to specify local endpoints when using localstack
# customEndpoint = {{kinesisEndpoint}}

# Thread pool size for Kinesis and SQS API requests
threadPoolSize = 10

# Optional SQS buffer for bad events.
# When messages can't be sent to Kinesis, they will be sent to SQS.
# If not configured, sending to Kinesis will be retried.
# This should only be set up for the Kinesis sink, where it acts as a failsafe.
# For the SQS sink, the good and bad queue should be specified under streams.good and streams.bad, respectively and these settings should be ignored.
#sqsBadBuffer = {{sqsBadBuffer}}

# Optional. Maximum number of bytes that a single record can contain.
# If a record is bigger, a size violation bad row is emitted instead
# Default: 192 kb
# SQS has a record size limit of 256 kb, but records are encoded with Base64,
# which adds approximately 33% of the size, so we set the limit to 256 kb * 3/4
sqsMaxBytes = 192000

# The following are used to authenticate for the Amazon Kinesis and SQS sinks.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = iam
secretKey = iam
}

# Optional
backoffPolicy {
# Minimum backoff period in milliseconds
minBackoff = 500
# Maximum backoff period in milliseconds
maxBackoff = 1500
# Failed inserts are retried forever.
# In case of just Kinesis without SQS, number of retries before setting /sink-health unhealthy.
# In case of Kinesis + SQS, number of retries with one before retrying with the other.
maxRetries = 3
}

# Optional. Maximum number of bytes that a single record can contain.
# If a record is bigger, a size violation bad row is emitted instead
# Default: 1 MB
# If SQS buffer is activated, sqsMaxBytes is used instead
maxBytes = 1000000

# When collector starts, it checks if Kinesis streams exist with describeStreamSummary
# and if SQS buffers exist with getQueueUrl (if configured).
# This is the interval for the calls.
# /sink-health is made healthy as soon as requests are successful or records are successfully inserted.
startupCheckInterval = 1 second

# Incoming events are stored in a buffer before being sent to Kinesis.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}
}

# Telemetry sends heartbeat events to external pipeline.
Expand Down
10 changes: 6 additions & 4 deletions examples/config.kinesis.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ collector {
port = 8080

streams {
good = "good"
bad = "bad"

sink {
good {
name = "good"
region = eu-central-1
}
bad {
name = "bad"
region = eu-central-1
}
}
Expand Down
36 changes: 20 additions & 16 deletions examples/config.nsq.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -163,27 +163,14 @@ collector {
}

streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = "good"

# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters
bad = "bad"

# Whether to use the incoming event's ip as the partition key for the good stream/topic
# Note: Nsq does not make use of partition key.
useIpAddressAsPartitionKey = false

# Enable the chosen sink by uncommenting the appropriate configuration
sink {
# Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout.
# To use stdout, comment or remove everything in the "collector.streams.sink" section except
# "enabled" which should be set to "stdout".
enabled = nsq

# Or NSQ
# Events which have successfully been collected will be stored in the good stream/topic
good {
name = "good"
## Host name for nsqd
host = "nsqHost"
## TCP port for nsqd, 4150 by default
Expand All @@ -194,6 +181,23 @@ collector {
# Default: 1 MB
maxBytes = 1000000
}

# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters
bad {
name = "bad"
## Host name for nsqd
host = "nsqHost"
## TCP port for nsqd, 4150 by default
port = 4150

# Optional. Maximum number of bytes that a single record can contain.
# If a record is bigger, a size violation bad row is emitted instead
# Default: 1 MB
maxBytes = 1000000
}
}

# Telemetry sends heartbeat events to external pipeline.
Expand Down
11 changes: 7 additions & 4 deletions examples/config.nsq.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ collector {
port = 8080

streams {
good = "good"
bad = "bad"

sink {
good {
name = "good"
host = "nsqHost"
}

bad {
name = "bad"
host = "nsqHost"
}
}
Expand Down
Loading
Loading