diff --git a/examples/config.kafka.extended.hocon b/examples/config.kafka.extended.hocon index 9d4a6e50c..7349deec5 100644 --- a/examples/config.kafka.extended.hocon +++ b/examples/config.kafka.extended.hocon @@ -163,28 +163,17 @@ collector { } streams { - # Events which have successfully been collected will be stored in the good stream/topic - good = "good" - - # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic. - # The collector can currently produce two flavours of bad row: - # - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits; - # - a generic_error if a request's querystring cannot be parsed because of illegal characters - bad = "bad" # Whether to use the incoming event's ip as the partition key for the good stream/topic # Note: Nsq does not make use of partition key. useIpAddressAsPartitionKey = false - # Enable the chosen sink by uncommenting the appropriate configuration - sink { - # Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout. - # To use stdout, comment or remove everything in the "collector.streams.sink" section except - # "enabled" which should be set to "stdout". - enabled = kafka - - # Or Kafka + # Events which have successfully been collected will be stored in the good stream/topic + good { + + name = "good" brokers = "localhost:9092,another.host:9092" + ## Number of retries to perform before giving up on sending a record retries = 10 # The kafka producer has a variety of possible configuration options defined at @@ -193,6 +182,7 @@ collector { # "bootstrap.servers" = brokers # "buffer.memory" = buffer.byteLimit # "linger.ms" = buffer.timeLimit + #producerConf { # acks = all # "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer" @@ -203,18 +193,58 @@ collector { # If a record is bigger, a size violation bad row is emitted instead # Default: 1 MB maxBytes = 1000000 + + # Incoming events are stored in a buffer before being sent to Kafka. + # The buffer is emptied whenever: + # - the number of stored records reaches record-limit or + # - the combined size of the stored records reaches byte-limit or + # - the time in milliseconds since the buffer was last emptied reaches time-limit + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } } - # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. - # Note: Buffering is not supported by NSQ. - # The buffer is emptied whenever: - # - the number of stored records reaches record-limit or - # - the combined size of the stored records reaches byte-limit or - # - the time in milliseconds since the buffer was last emptied reaches time-limit - buffer { - byteLimit = 3145728 - recordLimit = 500 - timeLimit = 5000 + # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic. + # The collector can currently produce two flavours of bad row: + # - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits; + # - a generic_error if a request's querystring cannot be parsed because of illegal characters + bad { + + name = "bad" + brokers = "localhost:9092,another.host:9092" + + ## Number of retries to perform before giving up on sending a record + retries = 10 + # The kafka producer has a variety of possible configuration options defined at + # https://kafka.apache.org/documentation/#producerconfigs + # Some values are set to other values from this config by default: + # "bootstrap.servers" = brokers + # "buffer.memory" = buffer.byteLimit + # "linger.ms" = buffer.timeLimit + + #producerConf { + # acks = all + # "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer" + # "value.serializer" = "org.apache.kafka.common.serialization.StringSerializer" + #} + + # Optional. Maximum number of bytes that a single record can contain. + # If a record is bigger, a size violation bad row is emitted instead + # Default: 1 MB + maxBytes = 1000000 + + # Incoming events are stored in a buffer before being sent to Kafka. + # The buffer is emptied whenever: + # - the number of stored records reaches record-limit or + # - the combined size of the stored records reaches byte-limit or + # - the time in milliseconds since the buffer was last emptied reaches time-limit + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } } } diff --git a/examples/config.kafka.minimal.hocon b/examples/config.kafka.minimal.hocon index 29ca6ff67..1547b5c1e 100644 --- a/examples/config.kafka.minimal.hocon +++ b/examples/config.kafka.minimal.hocon @@ -3,11 +3,13 @@ collector { port = 8080 streams { - good = "good" - bad = "bad" - - sink { - brokers = "localhost:9092,another.host:9092" + good { + name = "good" + brokers = "localhost:9092,another.host:9092" + } + bad { + name = "bad" + brokers = "localhost:9092,another.host:9092" } } -} +} \ No newline at end of file diff --git a/examples/config.kinesis.extended.hocon b/examples/config.kinesis.extended.hocon index 0c4bec090..3e931661b 100644 --- a/examples/config.kinesis.extended.hocon +++ b/examples/config.kinesis.extended.hocon @@ -163,26 +163,19 @@ collector { } streams { - # Events which have successfully been collected will be stored in the good stream/topic - good = "good" - # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic. - # The collector can currently produce two flavours of bad row: - # - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits; - # - a generic_error if a request's querystring cannot be parsed because of illegal characters + bad = "bad" # Whether to use the incoming event's ip as the partition key for the good stream/topic # Note: Nsq does not make use of partition key. useIpAddressAsPartitionKey = false - # Enable the chosen sink by uncommenting the appropriate configuration - sink { - # Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout. - # To use stdout, comment or remove everything in the "collector.streams.sink" section except - # "enabled" which should be set to "stdout". - enabled = kinesis + good { + # Events which have successfully been collected will be stored in the good stream/topic + name = "good" + # Region where the streams are located region = "eu-central-1" @@ -193,15 +186,13 @@ collector { # Thread pool size for Kinesis and SQS API requests threadPoolSize = 10 - # Optional SQS buffer for good and bad events (respectively). + # Optional SQS buffer for good events. # When messages can't be sent to Kinesis, they will be sent to SQS. # If not configured, sending to Kinesis will be retried. # This should only be set up for the Kinesis sink, where it acts as a failsafe. # For the SQS sink, the good and bad queue should be specified under streams.good and streams.bad, respectively and these settings should be ignored. #sqsGoodBuffer = {{sqsGoodBuffer}} - #sqsBadBuffer = {{sqsBadBuffer}} - # Optional. Maximum number of bytes that a single record can contain. # If a record is bigger, a size violation bad row is emitted instead # Default: 192 kb @@ -242,19 +233,96 @@ collector { # This is the interval for the calls. # /sink-health is made healthy as soon as requests are successful or records are successfully inserted. startupCheckInterval = 1 second + + # Incoming events are stored in a buffer before being sent to Kinesis. + # The buffer is emptied whenever: + # - the number of stored records reaches record-limit or + # - the combined size of the stored records reaches byte-limit or + # - the time in milliseconds since the buffer was last emptied reaches time-limit + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } } - - # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. - # Note: Buffering is not supported by NSQ. - # The buffer is emptied whenever: - # - the number of stored records reaches record-limit or - # - the combined size of the stored records reaches byte-limit or - # - the time in milliseconds since the buffer was last emptied reaches time-limit - buffer { - byteLimit = 3145728 - recordLimit = 500 - timeLimit = 5000 - } + + # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic. + # The collector can currently produce two flavours of bad row: + # - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits; + # - a generic_error if a request's querystring cannot be parsed because of illegal characters + bad { + + name = "bad" + + # Region where the streams are located + region = "eu-central-1" + + ## Optional endpoint url configuration to override aws kinesis endpoints, + ## this can be used to specify local endpoints when using localstack + # customEndpoint = {{kinesisEndpoint}} + + # Thread pool size for Kinesis and SQS API requests + threadPoolSize = 10 + + # Optional SQS buffer for bad events. + # When messages can't be sent to Kinesis, they will be sent to SQS. + # If not configured, sending to Kinesis will be retried. + # This should only be set up for the Kinesis sink, where it acts as a failsafe. + # For the SQS sink, the good and bad queue should be specified under streams.good and streams.bad, respectively and these settings should be ignored. + #sqsBadBuffer = {{sqsBadBuffer}} + + # Optional. Maximum number of bytes that a single record can contain. + # If a record is bigger, a size violation bad row is emitted instead + # Default: 192 kb + # SQS has a record size limit of 256 kb, but records are encoded with Base64, + # which adds approximately 33% of the size, so we set the limit to 256 kb * 3/4 + sqsMaxBytes = 192000 + + # The following are used to authenticate for the Amazon Kinesis and SQS sinks. + # If both are set to 'default', the default provider chain is used + # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) + # If both are set to 'iam', use AWS IAM Roles to provision credentials. + # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY + aws { + accessKey = iam + secretKey = iam + } + + # Optional + backoffPolicy { + # Minimum backoff period in milliseconds + minBackoff = 500 + # Maximum backoff period in milliseconds + maxBackoff = 1500 + # Failed inserts are retried forever. + # In case of just Kinesis without SQS, number of retries before setting /sink-health unhealthy. + # In case of Kinesis + SQS, number of retries with one before retrying with the other. + maxRetries = 3 + } + + # Optional. Maximum number of bytes that a single record can contain. + # If a record is bigger, a size violation bad row is emitted instead + # Default: 1 MB + # If SQS buffer is activated, sqsMaxBytes is used instead + maxBytes = 1000000 + + # When collector starts, it checks if Kinesis streams exist with describeStreamSummary + # and if SQS buffers exist with getQueueUrl (if configured). + # This is the interval for the calls. + # /sink-health is made healthy as soon as requests are successful or records are successfully inserted. + startupCheckInterval = 1 second + + # Incoming events are stored in a buffer before being sent to Kinesis. + # The buffer is emptied whenever: + # - the number of stored records reaches record-limit or + # - the combined size of the stored records reaches byte-limit or + # - the time in milliseconds since the buffer was last emptied reaches time-limit + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } + } } # Telemetry sends heartbeat events to external pipeline. diff --git a/examples/config.kinesis.minimal.hocon b/examples/config.kinesis.minimal.hocon index 2e0cb2314..9501390a5 100644 --- a/examples/config.kinesis.minimal.hocon +++ b/examples/config.kinesis.minimal.hocon @@ -3,10 +3,12 @@ collector { port = 8080 streams { - good = "good" - bad = "bad" - - sink { + good { + name = "good" + region = eu-central-1 + } + bad { + name = "bad" region = eu-central-1 } } diff --git a/examples/config.nsq.extended.hocon b/examples/config.nsq.extended.hocon index 88f05f2fc..4cc7f7f9d 100644 --- a/examples/config.nsq.extended.hocon +++ b/examples/config.nsq.extended.hocon @@ -163,27 +163,14 @@ collector { } streams { - # Events which have successfully been collected will be stored in the good stream/topic - good = "good" - - # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic. - # The collector can currently produce two flavours of bad row: - # - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits; - # - a generic_error if a request's querystring cannot be parsed because of illegal characters - bad = "bad" # Whether to use the incoming event's ip as the partition key for the good stream/topic # Note: Nsq does not make use of partition key. useIpAddressAsPartitionKey = false - # Enable the chosen sink by uncommenting the appropriate configuration - sink { - # Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout. - # To use stdout, comment or remove everything in the "collector.streams.sink" section except - # "enabled" which should be set to "stdout". - enabled = nsq - - # Or NSQ + # Events which have successfully been collected will be stored in the good stream/topic + good { + name = "good" ## Host name for nsqd host = "nsqHost" ## TCP port for nsqd, 4150 by default @@ -194,6 +181,23 @@ collector { # Default: 1 MB maxBytes = 1000000 } + + # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic. + # The collector can currently produce two flavours of bad row: + # - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits; + # - a generic_error if a request's querystring cannot be parsed because of illegal characters + bad { + name = "bad" + ## Host name for nsqd + host = "nsqHost" + ## TCP port for nsqd, 4150 by default + port = 4150 + + # Optional. Maximum number of bytes that a single record can contain. + # If a record is bigger, a size violation bad row is emitted instead + # Default: 1 MB + maxBytes = 1000000 + } } # Telemetry sends heartbeat events to external pipeline. diff --git a/examples/config.nsq.minimal.hocon b/examples/config.nsq.minimal.hocon index 97682cb1d..2b7afa7ca 100644 --- a/examples/config.nsq.minimal.hocon +++ b/examples/config.nsq.minimal.hocon @@ -3,10 +3,13 @@ collector { port = 8080 streams { - good = "good" - bad = "bad" - - sink { + good { + name = "good" + host = "nsqHost" + } + + bad { + name = "bad" host = "nsqHost" } } diff --git a/examples/config.pubsub.extended.hocon b/examples/config.pubsub.extended.hocon index 63c050abe..d8cd37b28 100644 --- a/examples/config.pubsub.extended.hocon +++ b/examples/config.pubsub.extended.hocon @@ -163,27 +163,16 @@ collector { } streams { - # Events which have successfully been collected will be stored in the good stream/topic - good = "good" - - # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic. - # The collector can currently produce two flavours of bad row: - # - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits; - # - a generic_error if a request's querystring cannot be parsed because of illegal characters - bad = "bad" # Whether to use the incoming event's ip as the partition key for the good stream/topic # Note: Nsq does not make use of partition key. useIpAddressAsPartitionKey = false - # Enable the chosen sink by uncommenting the appropriate configuration - sink { - # Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout. - # To use stdout, comment or remove everything in the "collector.streams.sink" section except - # "enabled" which should be set to "stdout". - enabled = google-pub-sub - + # Events which have successfully been collected will be stored in the good stream/topic + good { + name = "good" + googleProjectId = "google-project-id" ## Minimum, maximum and total backoff periods, in milliseconds ## and multiplier between two backoff @@ -211,20 +200,68 @@ collector { # In case of failure of these retries, the events are added to a buffer # and every retryInterval collector retries to send them. retryInterval = 10 seconds + + + # Incoming events are stored in a buffer before being sent to Pubsub. + # The buffer is emptied whenever: + # - the number of stored records reaches record-limit or + # - the combined size of the stored records reaches byte-limit or + # - the time in milliseconds since the buffer was last emptied reaches time-limit + buffer { + byteLimit = 100000 + recordLimit = 40 + timeLimit = 1000 + } } + + # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic. + # The collector can currently produce two flavours of bad row: + # - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits; + # - a generic_error if a request's querystring cannot be parsed because of illegal characters + bad { + name = "bad" + + googleProjectId = "google-project-id" + ## Minimum, maximum and total backoff periods, in milliseconds + ## and multiplier between two backoff + backoffPolicy { + minBackoff = 1000 + maxBackoff = 1000 + totalBackoff = 9223372036854 + multiplier = 2 + initialRpcTimeout = 10000 + maxRpcTimeout = 10000 + rpcTimeoutMultiplier = 2 + } - # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. - # Note: Buffering is not supported by NSQ. - # The buffer is emptied whenever: - # - the number of stored records reaches record-limit or - # - the combined size of the stored records reaches byte-limit or - # - the time in milliseconds since the buffer was last emptied reaches time-limit - buffer { - byteLimit = 100000 - recordLimit = 40 - timeLimit = 1000 - } + # Optional. Maximum number of bytes that a single record can contain. + # If a record is bigger, a size violation bad row is emitted instead + # Default: 10 MB + maxBytes = 10000000 + + # Optional. When collector starts, it checks if PubSub topics exist with listTopics. + # This is the interval for the calls. + # /sink-health is made healthy as soon as requests are successful or records are successfully inserted. + startupCheckInterval = 1 second + + # Optional. Collector uses built-in retry mechanism of PubSub API. + # In case of failure of these retries, the events are added to a buffer + # and every retryInterval collector retries to send them. + retryInterval = 10 seconds + + + # Incoming events are stored in a buffer before being sent to Pubsub. + # The buffer is emptied whenever: + # - the number of stored records reaches record-limit or + # - the combined size of the stored records reaches byte-limit or + # - the time in milliseconds since the buffer was last emptied reaches time-limit + buffer { + byteLimit = 100000 + recordLimit = 40 + timeLimit = 1000 + } + } } # Telemetry sends heartbeat events to external pipeline. diff --git a/examples/config.pubsub.minimal.hocon b/examples/config.pubsub.minimal.hocon index fb06f3aba..b6fdb8d05 100644 --- a/examples/config.pubsub.minimal.hocon +++ b/examples/config.pubsub.minimal.hocon @@ -3,10 +3,12 @@ collector { port = 8080 streams { - good = "good" - bad = "bad" - - sink { + good { + name = "good" + googleProjectId = "google-project-id" + } + bad { + name = "bad" googleProjectId = "google-project-id" } } diff --git a/examples/config.sqs.extended.hocon b/examples/config.sqs.extended.hocon index 452c09d2c..07e2d2e99 100644 --- a/examples/config.sqs.extended.hocon +++ b/examples/config.sqs.extended.hocon @@ -163,26 +163,15 @@ collector { } streams { - # Events which have successfully been collected will be stored in the good stream/topic - good = "good" - - # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic. - # The collector can currently produce two flavours of bad row: - # - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits; - # - a generic_error if a request's querystring cannot be parsed because of illegal characters - bad = "bad" # Whether to use the incoming event's ip as the partition key for the good stream/topic # Note: Nsq does not make use of partition key. useIpAddressAsPartitionKey = false - # Enable the chosen sink by uncommenting the appropriate configuration - sink { - # Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout. - # To use stdout, comment or remove everything in the "collector.streams.sink" section except - # "enabled" which should be set to "stdout". - enabled = sqs + # Events which have successfully been collected will be stored in the good stream/topic + good { + name = "good" # Region where the streams are located region = "eu-central-1" @@ -211,19 +200,68 @@ collector { # This is the interval for the calls. # /sink-health is made healthy as soon as requests are successful or records are successfully inserted. startupCheckInterval = 1 second + + # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. + # Note: Buffering is not supported by NSQ. + # The buffer is emptied whenever: + # - the number of stored records reaches record-limit or + # - the combined size of the stored records reaches byte-limit or + # - the time in milliseconds since the buffer was last emptied reaches time-limit + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } } + + # Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic. + # The collector can currently produce two flavours of bad row: + # - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits; + # - a generic_error if a request's querystring cannot be parsed because of illegal characters + bad { - # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. - # Note: Buffering is not supported by NSQ. - # The buffer is emptied whenever: - # - the number of stored records reaches record-limit or - # - the combined size of the stored records reaches byte-limit or - # - the time in milliseconds since the buffer was last emptied reaches time-limit - buffer { - byteLimit = 3145728 - recordLimit = 500 - timeLimit = 5000 - } + name = "bad" + # Region where the streams are located + region = "eu-central-1" + + # Thread pool size for Kinesis and SQS API requests + threadPoolSize = 10 + + # Optional + backoffPolicy { + # Minimum backoff period in milliseconds + minBackoff = 500 + # Maximum backoff period in milliseconds + maxBackoff = 1500 + # Failed inserts are retried forever. + # Number of retries before setting /sink-health unhealthy. + maxRetries = 3 + } + + # Optional. Maximum number of bytes that a single record can contain. + # If a record is bigger, a size violation bad row is emitted instead + # Default: 192 kb + # SQS has a record size limit of 256 kb, but records are encoded with Base64, + # which adds approximately 33% of the size, so we set the limit to 256 kb * 3/4 + maxBytes = 192000 + + # When collector starts, it checks if SQS buffers exist with getQueueUrl. + # This is the interval for the calls. + # /sink-health is made healthy as soon as requests are successful or records are successfully inserted. + startupCheckInterval = 1 second + + # Incoming events are stored in a buffer before being sent to Kinesis/Kafka. + # Note: Buffering is not supported by NSQ. + # The buffer is emptied whenever: + # - the number of stored records reaches record-limit or + # - the combined size of the stored records reaches byte-limit or + # - the time in milliseconds since the buffer was last emptied reaches time-limit + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } + } } # Telemetry sends heartbeat events to external pipeline. diff --git a/examples/config.sqs.minimal.hocon b/examples/config.sqs.minimal.hocon index 2e0cb2314..9501390a5 100644 --- a/examples/config.sqs.minimal.hocon +++ b/examples/config.sqs.minimal.hocon @@ -3,10 +3,12 @@ collector { port = 8080 streams { - good = "good" - bad = "bad" - - sink { + good { + name = "good" + region = eu-central-1 + } + bad { + name = "bad" region = eu-central-1 } } diff --git a/http4s/src/main/resources/reference.conf b/http4s/src/main/resources/reference.conf index 929d36685..96dfd594f 100644 --- a/http4s/src/main/resources/reference.conf +++ b/http4s/src/main/resources/reference.conf @@ -51,12 +51,6 @@ streams { useIpAddressAsPartitionKey = false - - buffer { - byteLimit = 3145728 - recordLimit = 500 - timeLimit = 5000 - } } telemetry { diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala index 23b614458..22ee2e25f 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/App.scala @@ -12,7 +12,7 @@ import com.snowplowanalytics.snowplow.scalatracker.emitters.http4s.ceTracking import com.snowplowanalytics.snowplow.collector.core.model.Sinks -abstract class App[SinkConfig <: Config.Sink: Decoder](appInfo: AppInfo) +abstract class App[SinkConfig: Decoder](appInfo: AppInfo) extends CommandIOApp( name = App.helpCommand(appInfo), header = "Snowplow application that collects tracking events", diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala index cf5bacdcb..86567becc 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala @@ -86,16 +86,12 @@ object Config { ) case class Streams[+SinkConfig]( - good: String, - bad: String, - useIpAddressAsPartitionKey: Boolean, - sink: SinkConfig, - buffer: Buffer + good: Sink[SinkConfig], + bad: Sink[SinkConfig], + useIpAddressAsPartitionKey: Boolean ) - trait Sink { - val maxBytes: Int - } + final case class Sink[+SinkConfig](name: String, buffer: Buffer, config: SinkConfig) case class Buffer( byteLimit: Long, @@ -166,15 +162,57 @@ object Config { implicit val redirectMacro = deriveDecoder[RedirectMacro] implicit val rootResponse = deriveDecoder[RootResponse] implicit val cors = deriveDecoder[CORS] - implicit val buffer = deriveDecoder[Buffer] - implicit val streams = deriveDecoder[Streams[SinkConfig]] implicit val statsd = deriveDecoder[Statsd] implicit val metrics = deriveDecoder[Metrics] implicit val monitoring = deriveDecoder[Monitoring] implicit val ssl = deriveDecoder[SSL] implicit val telemetry = deriveDecoder[Telemetry] implicit val networking = deriveDecoder[Networking] + implicit val sinkConfig = newDecoder[SinkConfig].or(legacyDecoder[SinkConfig]) + implicit val streams = deriveDecoder[Streams[SinkConfig]] + deriveDecoder[Config[SinkConfig]] } + implicit private val buffer: Decoder[Buffer] = deriveDecoder[Buffer] + + /** + * streams { + * good { + * name: "good-name" + * buffer {...} + * // rest of the sink config... + * } + * bad { + * name: "bad-name" + * buffer {...} + * // rest of the sink config... + * } + * } + */ + private def newDecoder[SinkConfig: Decoder]: Decoder[Sink[SinkConfig]] = + Decoder.instance { cursor => // cursor is at 'good'/'bad' section level + for { + sinkName <- cursor.get[String]("name") + config <- cursor.as[SinkConfig] + buffer <- cursor.get[Buffer]("buffer") + } yield Sink(sinkName, buffer, config) + } + + /** + * streams { + * good = "good-name" + * bad = "bad-name" + * buffer {...} //shared by good and bad + * sink {...} //shared by good and bad + * } + */ + private def legacyDecoder[SinkConfig: Decoder]: Decoder[Sink[SinkConfig]] = + Decoder.instance { cursor => //cursor is at 'good'/'bad' section level + for { + sinkName <- cursor.as[String] + config <- cursor.up.get[SinkConfig]("sink") //up first to the 'streams' section + buffer <- cursor.up.get[Buffer]("buffer") //up first to the 'streams' section + } yield Sink(sinkName, buffer, config) + } } diff --git a/http4s/src/test/resources/test-config-new-style.hocon b/http4s/src/test/resources/test-config-new-style.hocon new file mode 100644 index 000000000..06b3ba962 --- /dev/null +++ b/http4s/src/test/resources/test-config-new-style.hocon @@ -0,0 +1,36 @@ +collector { + interface = "0.0.0.0" + port = 8080 + + streams { + good { + name = "good" + + foo = "hello" + bar = "world" + + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } + } + + bad { + name = "bad" + + foo = "hello" + bar = "world" + + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } + } + } + + ssl { + enable = true + } +} diff --git a/http4s/src/test/resources/test-config.hocon b/http4s/src/test/resources/test-config-old-style.hocon similarity index 66% rename from http4s/src/test/resources/test-config.hocon rename to http4s/src/test/resources/test-config-old-style.hocon index 71202d62f..8d2e06598 100644 --- a/http4s/src/test/resources/test-config.hocon +++ b/http4s/src/test/resources/test-config-old-style.hocon @@ -10,6 +10,12 @@ collector { foo = "hello" bar = "world" } + + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 + } } ssl { diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParserSpec.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParserSpec.scala index 8106ab345..310df4365 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParserSpec.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/ConfigParserSpec.scala @@ -1,40 +1,60 @@ package com.snowplowanalytics.snowplow.collector.core import java.nio.file.Paths - import org.specs2.mutable.Specification - import cats.effect.IO - import cats.effect.testing.specs2.CatsEffect - +import com.snowplowanalytics.snowplow.collector.core.Config.Buffer import io.circe.generic.semiauto._ class ConfigParserSpec extends Specification with CatsEffect { "Loading the configuration" should { - "use reference.conf and the hocon specified in the path" in { - case class SinkConfig(foo: String, bar: String) - implicit val decoder = deriveDecoder[SinkConfig] - - val path = Paths.get(getClass.getResource(("/test-config.hocon")).toURI()) + "use reference.conf and the hocon specified in the path" >> { + "for new-style config" in { + assert(resource = "/test-config-new-style.hocon") + } + "for old-style config" in { + assert(resource = "/test-config-old-style.hocon") + } + } + } - val expectedStreams = Config.Streams[SinkConfig]( - "good", - "bad", - TestUtils.testConfig.streams.useIpAddressAsPartitionKey, - SinkConfig("hello", "world"), - TestUtils.testConfig.streams.buffer + private def assert(resource: String) = { + case class SinkConfig(foo: String, bar: String) + implicit val decoder = deriveDecoder[SinkConfig] + + val path = Paths.get(getClass.getResource(resource).toURI) + + val expectedStreams = Config.Streams[SinkConfig]( + good = Config.Sink( + name = "good", + buffer = Buffer( + 3145728, + 500, + 5000 + ), + SinkConfig("hello", "world") + ), + bad = Config.Sink( + name = "bad", + buffer = Buffer( + 3145728, + 500, + 5000 + ), + SinkConfig("hello", "world") + ), + TestUtils.testConfig.streams.useIpAddressAsPartitionKey + ) + val expected = TestUtils + .testConfig + .copy[SinkConfig]( + paths = Map.empty[String, String], + streams = expectedStreams, + ssl = TestUtils.testConfig.ssl.copy(enable = true) ) - val expected = TestUtils - .testConfig - .copy[SinkConfig]( - paths = Map.empty[String, String], - streams = expectedStreams, - ssl = TestUtils.testConfig.ssl.copy(enable = true) - ) - ConfigParser.fromPath[IO, SinkConfig](Some(path)).value.map(_ should beRight(expected)) - } + ConfigParser.fromPath[IO, SinkConfig](Some(path)).value.map(_ should beRight(expected)) } } diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala index 647871ee4..3647ec7d3 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala @@ -75,15 +75,25 @@ object TestUtils { ), cors = CORS(60.minutes), streams = Streams( - "raw", - "bad-1", - false, - AnyRef, - Buffer( - 3145728, - 500, - 5000 - ) + good = Sink( + name = "raw", + Buffer( + 3145728, + 500, + 5000 + ), + AnyRef + ), + bad = Sink( + name = "bad-1", + Buffer( + 3145728, + 500, + 5000 + ), + AnyRef + ), + useIpAddressAsPartitionKey = false ), monitoring = Monitoring( Metrics( diff --git a/kafka/src/it/resources/collector.hocon b/kafka/src/it/resources/collector.hocon index 78fd2c372..2468a977b 100644 --- a/kafka/src/it/resources/collector.hocon +++ b/kafka/src/it/resources/collector.hocon @@ -3,10 +3,13 @@ collector { port = ${PORT} streams { - good = ${TOPIC_GOOD} - bad = ${TOPIC_BAD} - - sink { + good { + name = ${TOPIC_GOOD} + brokers = ${BROKER} + maxBytes = ${MAX_BYTES} + } + bad { + name = ${TOPIC_BAD} brokers = ${BROKER} maxBytes = ${MAX_BYTES} } diff --git a/kafka/src/main/resources/application.conf b/kafka/src/main/resources/application.conf index 80182aeec..275fd19d1 100644 --- a/kafka/src/main/resources/application.conf +++ b/kafka/src/main/resources/application.conf @@ -1,12 +1,19 @@ collector { streams { + + //New object-like style + good = ${collector.streams.sink} + bad = ${collector.streams.sink} + + //Legacy style sink { - enabled = kafka threadPoolSize = 10 retries = 10 maxBytes = 1000000 + buffer = ${collector.streams.buffer} } + //Legacy style buffer { byteLimit = 3145728 recordLimit = 500 diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala index bd20c16e8..30db6f05c 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaCollector.scala @@ -23,18 +23,8 @@ object KafkaCollector extends App[KafkaSinkConfig](BuildInfo) { override def mkSinks(config: Config.Streams[KafkaSinkConfig]): Resource[IO, Sinks[IO]] = for { - good <- KafkaSink.create[IO]( - config.sink.maxBytes, - config.good, - config.sink, - config.buffer - ) - bad <- KafkaSink.create[IO]( - config.sink.maxBytes, - config.bad, - config.sink, - config.buffer - ) + good <- KafkaSink.create[IO](config.good) + bad <- KafkaSink.create[IO](config.bad) } yield Sinks(good, bad) override def telemetryInfo(config: Config.Streams[KafkaSinkConfig]): IO[Telemetry.TelemetryInfo] = diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala index 8c20858f1..88037674b 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSink.scala @@ -64,14 +64,11 @@ class KafkaSink[F[_]: Sync]( object KafkaSink { def create[F[_]: Sync]( - maxBytes: Int, - topicName: String, - kafkaConfig: KafkaSinkConfig, - bufferConfig: Config.Buffer + sinkConfig: Config.Sink[KafkaSinkConfig] ): Resource[F, KafkaSink[F]] = for { - kafkaProducer <- createProducer(kafkaConfig, bufferConfig) - kafkaSink = new KafkaSink(maxBytes, kafkaProducer, topicName) + kafkaProducer <- createProducer(sinkConfig.config, sinkConfig.buffer) + kafkaSink = new KafkaSink(sinkConfig.config.maxBytes, kafkaProducer, sinkConfig.name) } yield kafkaSink /** diff --git a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSinkConfig.scala b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSinkConfig.scala index 676a5259d..ee4ede0cb 100644 --- a/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSinkConfig.scala +++ b/kafka/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KafkaSinkConfig.scala @@ -3,14 +3,12 @@ package com.snowplowanalytics.snowplow.collectors.scalastream.sinks import io.circe.Decoder import io.circe.generic.semiauto._ -import com.snowplowanalytics.snowplow.collector.core.Config - final case class KafkaSinkConfig( maxBytes: Int, brokers: String, retries: Int, producerConf: Option[Map[String, String]] -) extends Config.Sink +) object KafkaSinkConfig { implicit val configDecoder: Decoder[KafkaSinkConfig] = deriveDecoder[KafkaSinkConfig] diff --git a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala index 703fd1563..3d05a7681 100644 --- a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala +++ b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala @@ -107,20 +107,35 @@ object KafkaConfigSpec { redirectDomains = Set.empty, preTerminationPeriod = 10.seconds, streams = Config.Streams( - good = "good", - bad = "bad", - useIpAddressAsPartitionKey = false, - buffer = Config.Buffer( - byteLimit = 3145728, - recordLimit = 500, - timeLimit = 5000 + good = Config.Sink( + name = "good", + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + config = KafkaSinkConfig( + maxBytes = 1000000, + brokers = "localhost:9092,another.host:9092", + retries = 10, + producerConf = None + ) ), - sink = KafkaSinkConfig( - maxBytes = 1000000, - brokers = "localhost:9092,another.host:9092", - retries = 10, - producerConf = None - ) + bad = Config.Sink( + name = "bad", + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + config = KafkaSinkConfig( + maxBytes = 1000000, + brokers = "localhost:9092,another.host:9092", + retries = 10, + producerConf = None + ) + ), + useIpAddressAsPartitionKey = false ), telemetry = Config.Telemetry( disable = false, diff --git a/kinesis/src/it/resources/collector-cookie-anonymous.hocon b/kinesis/src/it/resources/collector-cookie-anonymous.hocon index 55d7c4992..14f4ed802 100644 --- a/kinesis/src/it/resources/collector-cookie-anonymous.hocon +++ b/kinesis/src/it/resources/collector-cookie-anonymous.hocon @@ -3,10 +3,21 @@ collector { port = ${PORT} streams { - good = ${STREAM_GOOD} - bad = ${STREAM_BAD} + good { + name = ${STREAM_GOOD} + region = ${REGION} + customEndpoint = ${KINESIS_ENDPOINT} + + aws { + accessKey = env + secretKey = env + } - sink { + maxBytes = ${MAX_BYTES} + } + + bad { + name = ${STREAM_BAD} region = ${REGION} customEndpoint = ${KINESIS_ENDPOINT} diff --git a/kinesis/src/it/resources/collector-cookie-attributes-1.hocon b/kinesis/src/it/resources/collector-cookie-attributes-1.hocon index 3ad47e0b3..e661116da 100644 --- a/kinesis/src/it/resources/collector-cookie-attributes-1.hocon +++ b/kinesis/src/it/resources/collector-cookie-attributes-1.hocon @@ -3,10 +3,21 @@ collector { port = ${PORT} streams { - good = ${STREAM_GOOD} - bad = ${STREAM_BAD} + good { + name = ${STREAM_GOOD} + region = ${REGION} + customEndpoint = ${KINESIS_ENDPOINT} + + aws { + accessKey = env + secretKey = env + } - sink { + maxBytes = ${MAX_BYTES} + } + + bad { + name = ${STREAM_BAD} region = ${REGION} customEndpoint = ${KINESIS_ENDPOINT} diff --git a/kinesis/src/it/resources/collector-cookie-attributes-2.hocon b/kinesis/src/it/resources/collector-cookie-attributes-2.hocon index 55d7c4992..14f4ed802 100644 --- a/kinesis/src/it/resources/collector-cookie-attributes-2.hocon +++ b/kinesis/src/it/resources/collector-cookie-attributes-2.hocon @@ -3,10 +3,21 @@ collector { port = ${PORT} streams { - good = ${STREAM_GOOD} - bad = ${STREAM_BAD} + good { + name = ${STREAM_GOOD} + region = ${REGION} + customEndpoint = ${KINESIS_ENDPOINT} + + aws { + accessKey = env + secretKey = env + } - sink { + maxBytes = ${MAX_BYTES} + } + + bad { + name = ${STREAM_BAD} region = ${REGION} customEndpoint = ${KINESIS_ENDPOINT} diff --git a/kinesis/src/it/resources/collector-cookie-domain.hocon b/kinesis/src/it/resources/collector-cookie-domain.hocon index d8bdbdc4b..4a7eaee7c 100644 --- a/kinesis/src/it/resources/collector-cookie-domain.hocon +++ b/kinesis/src/it/resources/collector-cookie-domain.hocon @@ -3,10 +3,21 @@ collector { port = ${PORT} streams { - good = ${STREAM_GOOD} - bad = ${STREAM_BAD} + good { + name = ${STREAM_GOOD} + region = ${REGION} + customEndpoint = ${KINESIS_ENDPOINT} + + aws { + accessKey = env + secretKey = env + } - sink { + maxBytes = ${MAX_BYTES} + } + + bad { + name = ${STREAM_BAD} region = ${REGION} customEndpoint = ${KINESIS_ENDPOINT} diff --git a/kinesis/src/it/resources/collector-cookie-fallback.hocon b/kinesis/src/it/resources/collector-cookie-fallback.hocon index ecef93c0a..8c9c874f6 100644 --- a/kinesis/src/it/resources/collector-cookie-fallback.hocon +++ b/kinesis/src/it/resources/collector-cookie-fallback.hocon @@ -3,10 +3,21 @@ collector { port = ${PORT} streams { - good = ${STREAM_GOOD} - bad = ${STREAM_BAD} + good { + name = ${STREAM_GOOD} + region = ${REGION} + customEndpoint = ${KINESIS_ENDPOINT} + + aws { + accessKey = env + secretKey = env + } - sink { + maxBytes = ${MAX_BYTES} + } + + bad { + name = ${STREAM_BAD} region = ${REGION} customEndpoint = ${KINESIS_ENDPOINT} diff --git a/kinesis/src/it/resources/collector-cookie-no-domain.hocon b/kinesis/src/it/resources/collector-cookie-no-domain.hocon index 55d7c4992..14f4ed802 100644 --- a/kinesis/src/it/resources/collector-cookie-no-domain.hocon +++ b/kinesis/src/it/resources/collector-cookie-no-domain.hocon @@ -3,10 +3,21 @@ collector { port = ${PORT} streams { - good = ${STREAM_GOOD} - bad = ${STREAM_BAD} + good { + name = ${STREAM_GOOD} + region = ${REGION} + customEndpoint = ${KINESIS_ENDPOINT} + + aws { + accessKey = env + secretKey = env + } - sink { + maxBytes = ${MAX_BYTES} + } + + bad { + name = ${STREAM_BAD} region = ${REGION} customEndpoint = ${KINESIS_ENDPOINT} diff --git a/kinesis/src/it/resources/collector-custom-paths.hocon b/kinesis/src/it/resources/collector-custom-paths.hocon index f588fb1b6..a39c6d87d 100644 --- a/kinesis/src/it/resources/collector-custom-paths.hocon +++ b/kinesis/src/it/resources/collector-custom-paths.hocon @@ -3,10 +3,21 @@ collector { port = ${PORT} streams { - good = ${STREAM_GOOD} - bad = ${STREAM_BAD} + good { + name = ${STREAM_GOOD} + region = ${REGION} + customEndpoint = ${KINESIS_ENDPOINT} + + aws { + accessKey = env + secretKey = env + } - sink { + maxBytes = ${MAX_BYTES} + } + + bad { + name = ${STREAM_BAD} region = ${REGION} customEndpoint = ${KINESIS_ENDPOINT} diff --git a/kinesis/src/it/resources/collector-doNotTrackCookie-disabled.hocon b/kinesis/src/it/resources/collector-doNotTrackCookie-disabled.hocon index bf16f99a1..6f6f54155 100644 --- a/kinesis/src/it/resources/collector-doNotTrackCookie-disabled.hocon +++ b/kinesis/src/it/resources/collector-doNotTrackCookie-disabled.hocon @@ -3,10 +3,21 @@ collector { port = ${PORT} streams { - good = ${STREAM_GOOD} - bad = ${STREAM_BAD} + good { + name = ${STREAM_GOOD} + region = ${REGION} + customEndpoint = ${KINESIS_ENDPOINT} + + aws { + accessKey = env + secretKey = env + } - sink { + maxBytes = ${MAX_BYTES} + } + + bad { + name = ${STREAM_BAD} region = ${REGION} customEndpoint = ${KINESIS_ENDPOINT} diff --git a/kinesis/src/it/resources/collector-doNotTrackCookie-enabled.hocon b/kinesis/src/it/resources/collector-doNotTrackCookie-enabled.hocon index 5415d8263..0604641ae 100644 --- a/kinesis/src/it/resources/collector-doNotTrackCookie-enabled.hocon +++ b/kinesis/src/it/resources/collector-doNotTrackCookie-enabled.hocon @@ -3,10 +3,21 @@ collector { port = ${PORT} streams { - good = ${STREAM_GOOD} - bad = ${STREAM_BAD} + good { + name = ${STREAM_GOOD} + region = ${REGION} + customEndpoint = ${KINESIS_ENDPOINT} + + aws { + accessKey = env + secretKey = env + } - sink { + maxBytes = ${MAX_BYTES} + } + + bad { + name = ${STREAM_BAD} region = ${REGION} customEndpoint = ${KINESIS_ENDPOINT} diff --git a/kinesis/src/it/resources/collector.hocon b/kinesis/src/it/resources/collector.hocon index 177f7e673..0183b1258 100644 --- a/kinesis/src/it/resources/collector.hocon +++ b/kinesis/src/it/resources/collector.hocon @@ -3,10 +3,21 @@ collector { port = ${PORT} streams { - good = ${STREAM_GOOD} - bad = ${STREAM_BAD} + good { + name = ${STREAM_GOOD} + region = ${REGION} + customEndpoint = ${KINESIS_ENDPOINT} + + aws { + accessKey = env + secretKey = env + } - sink { + maxBytes = ${MAX_BYTES} + } + + bad { + name = ${STREAM_BAD} region = ${REGION} customEndpoint = ${KINESIS_ENDPOINT} diff --git a/kinesis/src/main/resources/application.conf b/kinesis/src/main/resources/application.conf index 49ee01e22..1cc2c0596 100644 --- a/kinesis/src/main/resources/application.conf +++ b/kinesis/src/main/resources/application.conf @@ -1,7 +1,11 @@ { streams { + //New object-like style + good = ${streams.sink} + bad = ${streams.sink} + + //Legacy style sink { - enabled = kinesis threadPoolSize = 10 aws { @@ -19,8 +23,10 @@ sqsMaxBytes = 192000 startupCheckInterval = 1 second + buffer = ${streams.buffer} } + //Legacy style buffer { byteLimit = 3145728 recordLimit = 500 diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala index 77be937cc..500dda1ec 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KinesisCollector.scala @@ -29,24 +29,10 @@ object KinesisCollector extends App[KinesisSinkConfig](BuildInfo) { private lazy val log = LoggerFactory.getLogger(getClass) override def mkSinks(config: Config.Streams[KinesisSinkConfig]): Resource[IO, Sinks[IO]] = { - val threadPoolExecutor = buildExecutorService(config.sink) + val threadPoolExecutor = buildExecutorService(config.good.config) for { - good <- KinesisSink.create[IO]( - kinesisMaxBytes = config.sink.maxBytes, - kinesisConfig = config.sink, - bufferConfig = config.buffer, - streamName = config.good, - sqsBufferName = config.sink.sqsGoodBuffer, - threadPoolExecutor - ) - bad <- KinesisSink.create[IO]( - kinesisMaxBytes = config.sink.maxBytes, - kinesisConfig = config.sink, - bufferConfig = config.buffer, - streamName = config.bad, - sqsBufferName = config.sink.sqsBadBuffer, - threadPoolExecutor - ) + good <- KinesisSink.create[IO](config.good, config.good.config.sqsGoodBuffer, threadPoolExecutor) + bad <- KinesisSink.create[IO](config.bad, config.good.config.sqsBadBuffer, threadPoolExecutor) } yield Sinks(good, bad) } @@ -55,7 +41,7 @@ object KinesisCollector extends App[KinesisSinkConfig](BuildInfo) { .getAccountId(config) .map(id => Telemetry.TelemetryInfo( - region = Some(config.sink.region), + region = Some(config.good.config.region), cloud = Some("AWS"), unhashedInstallationId = id ) diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala index f303d8cb0..e70d34ead 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala @@ -10,11 +10,11 @@ object TelemetryUtils { def getAccountId(config: Config.Streams[KinesisSinkConfig]): IO[Option[String]] = Resource .make( - IO(KinesisSink.createKinesisClient(config.sink.endpoint, config.sink.region)).rethrow + IO(KinesisSink.createKinesisClient(config.good.config.endpoint, config.good.config.region)).rethrow )(c => IO(c.shutdown())) .use { kinesis => IO { - val streamArn = KinesisSink.describeStream(kinesis, config.good).getStreamARN + val streamArn = KinesisSink.describeStream(kinesis, config.good.name).getStreamARN Some(extractAccountId(streamArn)) } } diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala index 772f9281e..55c770cbb 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSink.scala @@ -422,17 +422,14 @@ object KinesisSink { * during its construction. */ def create[F[_]: Sync]( - kinesisMaxBytes: Int, - kinesisConfig: KinesisSinkConfig, - bufferConfig: Config.Buffer, - streamName: String, + sinkConfig: Config.Sink[KinesisSinkConfig], sqsBufferName: Option[String], executorService: ScheduledExecutorService ): Resource[F, KinesisSink[F]] = { val acquire = Sync[F] .delay( - createAndInitialize(kinesisMaxBytes, kinesisConfig, bufferConfig, streamName, sqsBufferName, executorService) + createAndInitialize(sinkConfig, sqsBufferName, executorService) ) .rethrow val release = (sink: KinesisSink[F]) => Sync[F].delay(sink.shutdown()) @@ -471,29 +468,26 @@ object KinesisSink { * during its construction. */ private def createAndInitialize[F[_]: Sync]( - kinesisMaxBytes: Int, - kinesisConfig: KinesisSinkConfig, - bufferConfig: Config.Buffer, - streamName: String, + sinkConfig: Config.Sink[KinesisSinkConfig], sqsBufferName: Option[String], executorService: ScheduledExecutorService ): Either[Throwable, KinesisSink[F]] = { val clients = for { - kinesisClient <- createKinesisClient(kinesisConfig.endpoint, kinesisConfig.region) - sqsClientAndName <- sqsBuffer(sqsBufferName, kinesisConfig.region) + kinesisClient <- createKinesisClient(sinkConfig.config.endpoint, sinkConfig.config.region) + sqsClientAndName <- sqsBuffer(sqsBufferName, sinkConfig.config.region) } yield (kinesisClient, sqsClientAndName) clients.map { case (kinesisClient, sqsClientAndName) => val maxBytes = - if (sqsClientAndName.isDefined) kinesisConfig.sqsMaxBytes else kinesisMaxBytes + if (sqsClientAndName.isDefined) sinkConfig.config.sqsMaxBytes else sinkConfig.config.maxBytes val ks = new KinesisSink( maxBytes, kinesisClient, - kinesisConfig, - bufferConfig, - streamName, + sinkConfig.config, + sinkConfig.buffer, + sinkConfig.name, executorService, sqsClientAndName ) diff --git a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala index 8826c6b4b..bf6eb0219 100644 --- a/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala +++ b/kinesis/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisSinkConfig.scala @@ -1,6 +1,5 @@ package com.snowplowanalytics.snowplow.collectors.scalastream.sinks -import com.snowplowanalytics.snowplow.collector.core.Config import io.circe.Decoder import io.circe.generic.semiauto._ import io.circe.config.syntax.durationDecoder @@ -17,7 +16,7 @@ final case class KinesisSinkConfig( sqsBadBuffer: Option[String], sqsMaxBytes: Int, startupCheckInterval: FiniteDuration -) extends Config.Sink { +) { val endpoint = customEndpoint.getOrElse(region match { case cn @ "cn-north-1" => s"https://kinesis.$cn.amazonaws.com.cn" case cn @ "cn-northwest-1" => s"https://kinesis.$cn.amazonaws.com.cn" diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala index 3196fc84b..ed84fa80b 100644 --- a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala +++ b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala @@ -115,28 +115,52 @@ object KinesisConfigSpec { idleTimeout = 610.seconds ), streams = Config.Streams( - good = "good", - bad = "bad", useIpAddressAsPartitionKey = false, - buffer = Config.Buffer( - byteLimit = 3145728, - recordLimit = 500, - timeLimit = 5000 + good = Config.Sink( + name = "good", + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + config = KinesisSinkConfig( + maxBytes = 1000000, + region = "eu-central-1", + threadPoolSize = 10, + backoffPolicy = KinesisSinkConfig.BackoffPolicy( + minBackoff = 500, + maxBackoff = 1500, + maxRetries = 3 + ), + sqsBadBuffer = None, + sqsGoodBuffer = None, + sqsMaxBytes = 192000, + customEndpoint = None, + startupCheckInterval = 1.second + ) ), - sink = KinesisSinkConfig( - maxBytes = 1000000, - region = "eu-central-1", - threadPoolSize = 10, - backoffPolicy = KinesisSinkConfig.BackoffPolicy( - minBackoff = 500, - maxBackoff = 1500, - maxRetries = 3 + bad = Config.Sink( + name = "bad", + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 ), - sqsBadBuffer = None, - sqsGoodBuffer = None, - sqsMaxBytes = 192000, - customEndpoint = None, - startupCheckInterval = 1.second + config = KinesisSinkConfig( + maxBytes = 1000000, + region = "eu-central-1", + threadPoolSize = 10, + backoffPolicy = KinesisSinkConfig.BackoffPolicy( + minBackoff = 500, + maxBackoff = 1500, + maxRetries = 3 + ), + sqsBadBuffer = None, + sqsGoodBuffer = None, + sqsMaxBytes = 192000, + customEndpoint = None, + startupCheckInterval = 1.second + ) ) ), telemetry = Config.Telemetry( diff --git a/nsq/src/main/resources/application.conf b/nsq/src/main/resources/application.conf index 1df27cd22..bd867ae8a 100644 --- a/nsq/src/main/resources/application.conf +++ b/nsq/src/main/resources/application.conf @@ -1,10 +1,14 @@ collector { streams { + + good = ${collector.streams.sink} + bad = ${collector.streams.sink} + sink { - enabled = nsq threadPoolSize = 10 port = 4150 maxBytes = 1000000 + buffer = ${collector.streams.buffer} } buffer { diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala index b6fb40109..f66568a4a 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqCollector.scala @@ -22,16 +22,8 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._ object NsqCollector extends App[NsqSinkConfig](BuildInfo) { override def mkSinks(config: Config.Streams[NsqSinkConfig]): Resource[IO, Sinks[IO]] = for { - good <- NsqSink.create[IO]( - config.sink.maxBytes, - config.sink, - config.good - ) - bad <- NsqSink.create[IO]( - config.sink.maxBytes, - config.sink, - config.bad - ) + good <- NsqSink.create[IO](config.good) + bad <- NsqSink.create[IO](config.bad) } yield Sinks(good, bad) override def telemetryInfo(config: Config.Streams[NsqSinkConfig]): IO[Telemetry.TelemetryInfo] = diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala index 682761f4b..7a8d20c78 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSink.scala @@ -20,14 +20,11 @@ package com.snowplowanalytics.snowplow.collectors.scalastream package sinks import java.util.concurrent.TimeoutException - import scala.collection.JavaConverters._ - import cats.effect.{Resource, Sync} import cats.implicits._ - import com.snowplowanalytics.client.nsq.NSQProducer -import com.snowplowanalytics.snowplow.collector.core.{Sink} +import com.snowplowanalytics.snowplow.collector.core.{Config, Sink} import com.snowplowanalytics.client.nsq.exceptions.NSQException /** @@ -65,13 +62,11 @@ class NsqSink[F[_]: Sync] private ( object NsqSink { def create[F[_]: Sync]( - maxBytes: Int, - nsqConfig: NsqSinkConfig, - topicName: String + nsqConfig: Config.Sink[NsqSinkConfig] ): Resource[F, NsqSink[F]] = Resource.make( Sync[F].delay( - new NsqSink(maxBytes, nsqConfig, topicName) + new NsqSink(nsqConfig.config.maxBytes, nsqConfig.config, nsqConfig.name) ) )(sink => Sync[F].delay(sink.shutdown())) } diff --git a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSinkConfig.scala b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSinkConfig.scala index 6a050aeeb..b6fe8557b 100644 --- a/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSinkConfig.scala +++ b/nsq/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/NsqSinkConfig.scala @@ -22,14 +22,12 @@ package com.snowplowanalytics.snowplow.collectors.scalastream.sinks import io.circe.Decoder import io.circe.generic.semiauto._ -import com.snowplowanalytics.snowplow.collector.core.Config - final case class NsqSinkConfig( maxBytes: Int, threadPoolSize: Int, host: String, port: Int -) extends Config.Sink +) object NsqSinkConfig { implicit val configDecoder: Decoder[NsqSinkConfig] = deriveDecoder[NsqSinkConfig] diff --git a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala index 6068295a2..30ed06cea 100644 --- a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala +++ b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala @@ -106,19 +106,34 @@ object NsqConfigSpec { redirectDomains = Set.empty, preTerminationPeriod = 10.seconds, streams = Config.Streams( - good = "good", - bad = "bad", useIpAddressAsPartitionKey = false, - buffer = Config.Buffer( - byteLimit = 3145728, - recordLimit = 500, - timeLimit = 5000 + good = Config.Sink( + name = "good", + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + config = NsqSinkConfig( + maxBytes = 1000000, + threadPoolSize = 10, + host = "nsqHost", + port = 4150 + ) ), - sink = NsqSinkConfig( - maxBytes = 1000000, - threadPoolSize = 10, - host = "nsqHost", - port = 4150 + bad = Config.Sink( + name = "bad", + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + config = NsqSinkConfig( + maxBytes = 1000000, + threadPoolSize = 10, + host = "nsqHost", + port = 4150 + ) ) ), telemetry = Config.Telemetry( diff --git a/pubsub/src/it/resources/collector.hocon b/pubsub/src/it/resources/collector.hocon index 923d10e56..d964fbe56 100644 --- a/pubsub/src/it/resources/collector.hocon +++ b/pubsub/src/it/resources/collector.hocon @@ -3,12 +3,15 @@ collector { port = ${PORT} streams { - good = ${TOPIC_GOOD} - bad = ${TOPIC_BAD} - - sink { - googleProjectId = ${GOOGLE_PROJECT_ID} - maxBytes = ${MAX_BYTES} + good { + name = ${TOPIC_GOOD} + googleProjectId = ${GOOGLE_PROJECT_ID} + maxBytes = ${MAX_BYTES} + } + bad { + name = ${TOPIC_BAD} + googleProjectId = ${GOOGLE_PROJECT_ID} + maxBytes = ${MAX_BYTES} } } } \ No newline at end of file diff --git a/pubsub/src/main/resources/application.conf b/pubsub/src/main/resources/application.conf index e75d8f90e..6b33a1d32 100644 --- a/pubsub/src/main/resources/application.conf +++ b/pubsub/src/main/resources/application.conf @@ -1,7 +1,10 @@ { streams { + //New object-like style + good = ${streams.sink} + bad = ${streams.sink} + sink { - enabled = google-pub-sub threadPoolSize = 10 backoffPolicy { @@ -18,6 +21,7 @@ startupCheckInterval = 1 second retryInterval = 10 seconds + buffer = ${streams.buffer} } buffer { diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala index 026caf030..8d8526660 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/PubSubCollector.scala @@ -10,8 +10,8 @@ object PubSubCollector extends App[PubSubSinkConfig](BuildInfo) { override def mkSinks(config: Config.Streams[PubSubSinkConfig]): Resource[IO, Sinks[IO]] = for { - good <- PubSubSink.create[IO](config.sink.maxBytes, config.sink, config.buffer, config.good) - bad <- PubSubSink.create[IO](config.sink.maxBytes, config.sink, config.buffer, config.bad) + good <- PubSubSink.create[IO](config.good) + bad <- PubSubSink.create[IO](config.bad) } yield Sinks(good, bad) override def telemetryInfo(config: Config.Streams[PubSubSinkConfig]): IO[Telemetry.TelemetryInfo] = @@ -19,7 +19,7 @@ object PubSubCollector extends App[PubSubSinkConfig](BuildInfo) { Telemetry.TelemetryInfo( region = None, cloud = Some("GCP"), - unhashedInstallationId = Some(config.sink.googleProjectId) + unhashedInstallationId = Some(config.good.config.googleProjectId) ) ) } diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala index 605d23a7c..750c6df02 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSink.scala @@ -84,21 +84,18 @@ object PubSubSink { } def create[F[_]: Async: Parallel]( - maxBytes: Int, - sinkConfig: PubSubSinkConfig, - bufferConfig: Config.Buffer, - topicName: String + sinkConfig: Config.Sink[PubSubSinkConfig] ): Resource[F, Sink[F]] = for { isHealthyState <- Resource.eval(Ref.of[F, Boolean](false)) - producer <- createProducer[F](sinkConfig, topicName, bufferConfig) - _ <- PubSubHealthCheck.run(isHealthyState, sinkConfig, topicName) + producer <- createProducer[F](sinkConfig.config, sinkConfig.name, sinkConfig.buffer) + _ <- PubSubHealthCheck.run(isHealthyState, sinkConfig.config, sinkConfig.name) } yield new PubSubSink( - maxBytes, + sinkConfig.config.maxBytes, isHealthyState, producer, - sinkConfig.retryInterval, - topicName + sinkConfig.config.retryInterval, + sinkConfig.name ) private def createProducer[F[_]: Async: Parallel]( diff --git a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSinkConfig.scala b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSinkConfig.scala index d8c92955b..d467121bd 100644 --- a/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSinkConfig.scala +++ b/pubsub/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/PubSubSinkConfig.scala @@ -1,6 +1,5 @@ package com.snowplowanalytics.snowplow.collectors.scalastream.sinks -import com.snowplowanalytics.snowplow.collector.core.Config import com.snowplowanalytics.snowplow.collectors.scalastream.sinks.PubSubSinkConfig.BackoffPolicy import io.circe.Decoder import io.circe.config.syntax.durationDecoder @@ -14,7 +13,7 @@ final case class PubSubSinkConfig( backoffPolicy: BackoffPolicy, startupCheckInterval: FiniteDuration, retryInterval: FiniteDuration -) extends Config.Sink +) object PubSubSinkConfig { diff --git a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala index c3fc77eee..2f3de5474 100644 --- a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala +++ b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala @@ -100,28 +100,52 @@ object ConfigSpec { idleTimeout = 610.seconds ), streams = Config.Streams( - good = "good", - bad = "bad", useIpAddressAsPartitionKey = false, - buffer = Config.Buffer( - byteLimit = 100000, - recordLimit = 40, - timeLimit = 1000 + good = Config.Sink( + name = "good", + buffer = Config.Buffer( + byteLimit = 100000, + recordLimit = 40, + timeLimit = 1000 + ), + config = PubSubSinkConfig( + maxBytes = 10000000, + googleProjectId = "google-project-id", + backoffPolicy = PubSubSinkConfig.BackoffPolicy( + minBackoff = 1000, + maxBackoff = 1000, + totalBackoff = 9223372036854L, + multiplier = 2, + initialRpcTimeout = 10000, + maxRpcTimeout = 10000, + rpcTimeoutMultiplier = 2 + ), + startupCheckInterval = 1.second, + retryInterval = 10.seconds + ) ), - sink = PubSubSinkConfig( - maxBytes = 10000000, - googleProjectId = "google-project-id", - backoffPolicy = PubSubSinkConfig.BackoffPolicy( - minBackoff = 1000, - maxBackoff = 1000, - totalBackoff = 9223372036854L, - multiplier = 2, - initialRpcTimeout = 10000, - maxRpcTimeout = 10000, - rpcTimeoutMultiplier = 2 + bad = Config.Sink( + name = "bad", + buffer = Config.Buffer( + byteLimit = 100000, + recordLimit = 40, + timeLimit = 1000 ), - startupCheckInterval = 1.second, - retryInterval = 10.seconds + config = PubSubSinkConfig( + maxBytes = 10000000, + googleProjectId = "google-project-id", + backoffPolicy = PubSubSinkConfig.BackoffPolicy( + minBackoff = 1000, + maxBackoff = 1000, + totalBackoff = 9223372036854L, + multiplier = 2, + initialRpcTimeout = 10000, + maxRpcTimeout = 10000, + rpcTimeoutMultiplier = 2 + ), + startupCheckInterval = 1.second, + retryInterval = 10.seconds + ) ) ), telemetry = Config.Telemetry( diff --git a/sqs/src/main/resources/application.conf b/sqs/src/main/resources/application.conf index a862f2b43..663e7aca1 100644 --- a/sqs/src/main/resources/application.conf +++ b/sqs/src/main/resources/application.conf @@ -1,5 +1,7 @@ collector { streams { + good = ${collector.streams.sink} + bad = ${collector.streams.sink} sink { enabled = sqs threadPoolSize = 10 @@ -13,6 +15,7 @@ collector { maxBytes = 192000 startupCheckInterval = 1 second + buffer = ${collector.streams.buffer} } buffer { diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala index f7a8759e0..006ffb719 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsCollector.scala @@ -23,22 +23,10 @@ import com.snowplowanalytics.snowplow.collectors.scalastream.sinks._ object SqsCollector extends App[SqsSinkConfig](BuildInfo) { override def mkSinks(config: Config.Streams[SqsSinkConfig]): Resource[IO, Sinks[IO]] = { - val threadPoolExecutor = new ScheduledThreadPoolExecutor(config.sink.threadPoolSize) + val threadPoolExecutor = new ScheduledThreadPoolExecutor(config.good.config.threadPoolSize) for { - good <- SqsSink.create[IO]( - config.sink.maxBytes, - config.sink, - config.buffer, - config.good, - threadPoolExecutor - ) - bad <- SqsSink.create[IO]( - config.sink.maxBytes, - config.sink, - config.buffer, - config.bad, - threadPoolExecutor - ) + good <- SqsSink.create[IO](config.good, threadPoolExecutor) + bad <- SqsSink.create[IO](config.bad, threadPoolExecutor) } yield Sinks(good, bad) } @@ -47,7 +35,7 @@ object SqsCollector extends App[SqsSinkConfig](BuildInfo) { .getAccountId(config) .map(id => Telemetry.TelemetryInfo( - region = Some(config.sink.region), + region = Some(config.good.config.region), cloud = Some("AWS"), unhashedInstallationId = id ) diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala index 7aa013c77..f0b14ef9e 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/TelemetryUtils.scala @@ -9,11 +9,11 @@ object TelemetryUtils { def getAccountId(config: Config.Streams[SqsSinkConfig]): IO[Option[String]] = Resource .make( - IO(SqsSink.createSqsClient(config.sink.region)).rethrow + IO(SqsSink.createSqsClient(config.good.config.region)).rethrow )(c => IO(c.shutdown())) .use { client => IO { - val sqsQueueUrl = client.getQueueUrl(config.good).getQueueUrl + val sqsQueueUrl = client.getQueueUrl(config.good.name).getQueueUrl Some(extractAccountId(sqsQueueUrl)) } } diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala index af5708b1a..c5317b0a3 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSink.scala @@ -281,16 +281,13 @@ object SqsSink { final case class BatchResultErrorInfo(code: String, message: String) def create[F[_]: Sync]( - maxBytes: Int, - sqsConfig: SqsSinkConfig, - bufferConfig: Config.Buffer, - queueName: String, + sqsConfig: Config.Sink[SqsSinkConfig], executorService: ScheduledExecutorService ): Resource[F, SqsSink[F]] = { val acquire = Sync[F] .delay( - createAndInitialize(maxBytes, sqsConfig, bufferConfig, queueName, executorService) + createAndInitialize(sqsConfig, executorService) ) .rethrow val release = (sink: SqsSink[F]) => Sync[F].delay(sink.shutdown()) @@ -309,14 +306,12 @@ object SqsSink { * during its construction. */ def createAndInitialize[F[_]: Sync]( - maxBytes: Int, - sqsConfig: SqsSinkConfig, - bufferConfig: Config.Buffer, - queueName: String, + sqsConfig: Config.Sink[SqsSinkConfig], executorService: ScheduledExecutorService ): Either[Throwable, SqsSink[F]] = - createSqsClient(sqsConfig.region).map { c => - val sqsSink = new SqsSink(maxBytes, c, sqsConfig, bufferConfig, queueName, executorService) + createSqsClient(sqsConfig.config.region).map { c => + val sqsSink = + new SqsSink(sqsConfig.config.maxBytes, c, sqsConfig.config, sqsConfig.buffer, sqsConfig.name, executorService) sqsSink.EventStorage.scheduleFlush() sqsSink.checkSqsHealth() sqsSink diff --git a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSinkConfig.scala b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSinkConfig.scala index 7db8b879f..c8694713d 100644 --- a/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSinkConfig.scala +++ b/sqs/src/main/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/SqsSinkConfig.scala @@ -3,18 +3,14 @@ package com.snowplowanalytics.snowplow.collectors.scalastream.sinks import io.circe.Decoder import io.circe.generic.semiauto._ -import com.snowplowanalytics.snowplow.collector.core.Config - final case class SqsSinkConfig( maxBytes: Int, region: String, backoffPolicy: SqsSinkConfig.BackoffPolicyConfig, threadPoolSize: Int -) extends Config.Sink +) object SqsSinkConfig { - final case class AWSConfig(accessKey: String, secretKey: String) - final case class BackoffPolicyConfig(minBackoff: Long, maxBackoff: Long, maxRetries: Int) implicit val configDecoder: Decoder[SqsSinkConfig] = deriveDecoder[SqsSinkConfig] diff --git a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala index d02c99fe4..704a6c0dd 100644 --- a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala +++ b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala @@ -111,23 +111,42 @@ object SqsConfigSpec { idleTimeout = 610.seconds ), streams = Config.Streams( - good = "good", - bad = "bad", useIpAddressAsPartitionKey = false, - buffer = Config.Buffer( - byteLimit = 3145728, - recordLimit = 500, - timeLimit = 5000 + good = Config.Sink( + name = "good", + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 + ), + config = SqsSinkConfig( + maxBytes = 192000, + region = "eu-central-1", + backoffPolicy = SqsSinkConfig.BackoffPolicyConfig( + minBackoff = 500, + maxBackoff = 1500, + maxRetries = 3 + ), + threadPoolSize = 10 + ) ), - sink = SqsSinkConfig( - maxBytes = 192000, - region = "eu-central-1", - backoffPolicy = SqsSinkConfig.BackoffPolicyConfig( - minBackoff = 500, - maxBackoff = 1500, - maxRetries = 3 + bad = Config.Sink( + name = "bad", + buffer = Config.Buffer( + byteLimit = 3145728, + recordLimit = 500, + timeLimit = 5000 ), - threadPoolSize = 10 + config = SqsSinkConfig( + maxBytes = 192000, + region = "eu-central-1", + backoffPolicy = SqsSinkConfig.BackoffPolicyConfig( + minBackoff = 500, + maxBackoff = 1500, + maxRetries = 3 + ), + threadPoolSize = 10 + ) ) ), telemetry = Config.Telemetry( diff --git a/stdout/src/main/resources/application.conf b/stdout/src/main/resources/application.conf index 570541343..c65ae089f 100644 --- a/stdout/src/main/resources/application.conf +++ b/stdout/src/main/resources/application.conf @@ -1,7 +1,15 @@ collector { streams { + good = ${collector.streams.sink} + bad = ${collector.streams.sink} sink { maxBytes = 1000000000 + buffer = ${collector.streams.buffer} + } + buffer { + byteLimit = 3145728 + recordLimit = 500 + timeLimit = 5000 } } } \ No newline at end of file diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/SinkConfig.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/SinkConfig.scala index 59e16e209..99a727bba 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/SinkConfig.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/SinkConfig.scala @@ -3,11 +3,9 @@ package com.snowplowanalytics.snowplow.collector.stdout import io.circe.Decoder import io.circe.generic.semiauto._ -import com.snowplowanalytics.snowplow.collector.core.Config - final case class SinkConfig( maxBytes: Int -) extends Config.Sink +) object SinkConfig { implicit val configDecoder: Decoder[SinkConfig] = deriveDecoder[SinkConfig] diff --git a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala index c307c5bc3..b5d479d4e 100644 --- a/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala +++ b/stdout/src/main/scala/com.snowplowanalytics.snowplow.collector.stdout/StdoutCollector.scala @@ -8,8 +8,8 @@ import com.snowplowanalytics.snowplow.collector.core.{App, Config, Telemetry} object StdoutCollector extends App[SinkConfig](BuildInfo) { override def mkSinks(config: Config.Streams[SinkConfig]): Resource[IO, Sinks[IO]] = { - val good = new PrintingSink[IO](config.sink.maxBytes, System.out) - val bad = new PrintingSink[IO](config.sink.maxBytes, System.err) + val good = new PrintingSink[IO](config.good.config.maxBytes, System.out) + val bad = new PrintingSink[IO](config.bad.config.maxBytes, System.err) Resource.pure(Sinks(good, bad)) }