diff --git a/build.sbt b/build.sbt index 01116ed90..e89dced04 100644 --- a/build.sbt +++ b/build.sbt @@ -29,6 +29,8 @@ lazy val core = project Dependencies.Libraries.circeConfig, Dependencies.Libraries.trackerCore, Dependencies.Libraries.emitterHttps, + Dependencies.Libraries.datadogHttp4s, + Dependencies.Libraries.datadogStatsd, Dependencies.Libraries.specs2, Dependencies.Libraries.specs2CE, Dependencies.Libraries.ceTestkit, diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 96dfd594f..60014bc81 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -70,6 +70,9 @@ port = 8125 period = 10 seconds prefix = snowplow.collector + tags = { + "app": "collector" + } } } } diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala index 923427114..e32609f81 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala @@ -120,7 +120,8 @@ object Config { hostname: String, port: Int, period: FiniteDuration, - prefix: String + prefix: String, + tags: Map[String, String] ) case class SSL( diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala index 8310aeefc..726ae61b8 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala @@ -10,9 +10,13 @@ package com.snowplowanalytics.snowplow.collector.core import cats.effect.{Async, Resource} import cats.implicits._ -import org.http4s.HttpApp +import com.avast.datadog4s.api.Tag +import com.avast.datadog4s.extension.http4s.DatadogMetricsOps +import com.avast.datadog4s.{StatsDMetricFactory, StatsDMetricFactoryConfig} +import org.http4s.HttpRoutes import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.server.Server +import org.http4s.server.middleware.Metrics import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger @@ -24,15 +28,38 @@ object HttpServer { implicit private def logger[F[_]: Async]: Logger[F] = Slf4jLogger.getLogger[F] def build[F[_]: Async]( - app: HttpApp[F], + routes: HttpRoutes[F], port: Int, secure: Boolean, - networking: Config.Networking + networking: Config.Networking, + metricsConfig: Config.Metrics ): Resource[F, Server] = - buildBlazeServer[F](app, port, secure, networking) + for { + withMetricsMiddleware <- createMetricsMiddleware(routes, metricsConfig) + server <- buildBlazeServer[F](withMetricsMiddleware, port, secure, networking) + } yield server + + private def createMetricsMiddleware[F[_]: Async]( + routes: HttpRoutes[F], + metricsConfig: Config.Metrics + ): Resource[F, HttpRoutes[F]] = + if (metricsConfig.statsd.enabled) { + val metricsFactory = StatsDMetricFactory.make(createStatsdConfig(metricsConfig)) + metricsFactory.evalMap(DatadogMetricsOps.builder[F](_).build()).map { metricsOps => + Metrics[F](metricsOps)(routes) + } + } else { + Resource.pure(routes) + } + + private def createStatsdConfig(metricsConfig: Config.Metrics): StatsDMetricFactoryConfig = { + val server = InetSocketAddress.createUnresolved(metricsConfig.statsd.hostname, metricsConfig.statsd.port) + val tags = metricsConfig.statsd.tags.toSeq.map { case (name, value) => Tag.of(name, value) } + StatsDMetricFactoryConfig(Some(metricsConfig.statsd.prefix), server, defaultTags = tags) + } private def buildBlazeServer[F[_]: Async]( - app: HttpApp[F], + routes: HttpRoutes[F], port: Int, secure: Boolean, networking: Config.Networking @@ -40,7 +67,7 @@ object HttpServer { Resource.eval(Logger[F].info("Building blaze server")) >> BlazeServerBuilder[F] .bindSocketAddress(new InetSocketAddress(port)) - .withHttpApp(app) + .withHttpApp(routes.orNotFound) .withIdleTimeout(networking.idleTimeout) .withMaxConnections(networking.maxConnections) .cond(secure, _.withSslContext(SSLContext.getDefault)) diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala index 83f38c2e5..7125a9a7c 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Routes.scala @@ -92,9 +92,8 @@ class Routes[F[_]: Sync]( service.crossdomainResponse } - val value: HttpApp[F] = { + val value: HttpRoutes[F] = { val routes = healthRoutes <+> corsRoute <+> cookieRoutes <+> rootRoute <+> crossdomainRoute - val res = if (enableDefaultRedirect) routes else rejectRedirect <+> routes - res.orNotFound + if (enableDefaultRedirect) routes else rejectRedirect <+> routes } } diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala index 2d9f44e21..70c9a80b4 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -87,7 +87,8 @@ object Run { ).value, if (config.ssl.enable) config.ssl.port else config.port, config.ssl.enable, - config.networking + config.networking, + config.monitoring.metrics ) _ <- withGracefulShutdown(config.preTerminationPeriod)(httpServer) httpClient <- BlazeClientBuilder[F].resource diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala index 0278c9bd4..b3a01f551 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/RoutesSpec.scala @@ -71,7 +71,8 @@ class RoutesSpec extends Specification { enableCrossdomainTracking: Boolean = false ) = { val service = new TestService() - val routes = new Routes(enabledDefaultRedirect, enableRootResponse, enableCrossdomainTracking, service).value + val routes = + new Routes(enabledDefaultRedirect, enableRootResponse, enableCrossdomainTracking, service).value.orNotFound (service, routes) } diff --git a/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala b/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala index 2dc0b780e..01c17dc23 100644 --- a/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala +++ b/core/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala @@ -102,7 +102,8 @@ object TestUtils { "localhost", 8125, 10.seconds, - "snowplow.collector" + "snowplow.collector", + Map("app" -> "collector") ) ) ), diff --git a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala index 3942386e5..fd7b53266 100644 --- a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala +++ b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala @@ -90,8 +90,11 @@ object KafkaConfigSpec { body = "" ), cors = Config.CORS(1.hour), - monitoring = - Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))), + monitoring = Config.Monitoring( + Config.Metrics( + Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector", Map("app" -> "collector")) + ) + ), ssl = Config.SSL(enable = false, redirect = false, port = 443), enableDefaultRedirect = false, redirectDomains = Set.empty, diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala index 346480116..21c4e8275 100644 --- a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala +++ b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala @@ -105,8 +105,11 @@ object KinesisConfigSpec { body = "" ), cors = Config.CORS(1.hour), - monitoring = - Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))), + monitoring = Config.Monitoring( + Config.Metrics( + Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector", Map("app" -> "collector")) + ) + ), ssl = Config.SSL(enable = false, redirect = false, port = 443), enableDefaultRedirect = false, redirectDomains = Set.empty, diff --git a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala index 4e5574ca0..6f4b9161f 100644 --- a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala +++ b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala @@ -89,8 +89,11 @@ object NsqConfigSpec { body = "" ), cors = Config.CORS(1.hour), - monitoring = - Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))), + monitoring = Config.Monitoring( + Config.Metrics( + Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector", Map("app" -> "collector")) + ) + ), ssl = Config.SSL(enable = false, redirect = false, port = 443), enableDefaultRedirect = false, redirectDomains = Set.empty, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 916de7f5b..4514dc102 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -38,6 +38,7 @@ object Dependencies { val thrift = "0.15.0" // force this version to mitigate security vulnerabilities val tracker = "2.0.0" val azureAuth = "1.7.14" + val dataDog4s = "0.32.0" } object Libraries { @@ -57,6 +58,8 @@ object Dependencies { val slf4j = "org.slf4j" % "slf4j-simple" % V.slf4j val thrift = "org.apache.thrift" % "libthrift" % V.thrift val trackerCore = "com.snowplowanalytics" %% "snowplow-scala-tracker-core" % V.tracker + val datadogHttp4s = "com.avast.cloud" %% "datadog4s-http4s" % V.dataDog4s + val datadogStatsd = "com.avast.cloud" %% "datadog4s-statsd" % V.dataDog4s //sinks val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub diff --git a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala index bc5ce9d8f..b62788336 100644 --- a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala +++ b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala @@ -90,8 +90,11 @@ object ConfigSpec { body = "" ), cors = Config.CORS(1.hour), - monitoring = - Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))), + monitoring = Config.Monitoring( + Config.Metrics( + Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector", Map("app" -> "collector")) + ) + ), ssl = Config.SSL(enable = false, redirect = false, port = 443), enableDefaultRedirect = false, redirectDomains = Set.empty, diff --git a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala index 7b1f33d25..da66a5c8e 100644 --- a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala +++ b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala @@ -90,8 +90,11 @@ object SqsConfigSpec { body = "" ), cors = Config.CORS(1.hour), - monitoring = - Config.Monitoring(Config.Metrics(Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector"))), + monitoring = Config.Monitoring( + Config.Metrics( + Config.Statsd(false, "localhost", 8125, 10.seconds, "snowplow.collector", Map("app" -> "collector")) + ) + ), ssl = Config.SSL(enable = false, redirect = false, port = 443), enableDefaultRedirect = false, redirectDomains = Set.empty,