diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroHttpServer.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroHttpServer.scala new file mode 100644 index 0000000..ed9727f --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/MicroHttpServer.scala @@ -0,0 +1,83 @@ +package com.snowplowanalytics.snowplow.micro + +import cats.effect.{IO, Resource} +import cats.implicits._ +import com.avast.datadog4s.api.Tag +import com.avast.datadog4s.extension.http4s.DatadogMetricsOps +import com.avast.datadog4s.{StatsDMetricFactory, StatsDMetricFactoryConfig} +import com.snowplowanalytics.snowplow.collector.core.{Config => CollectorConfig} +import org.http4s.blaze.server.BlazeServerBuilder +import org.http4s.headers.`Strict-Transport-Security` +import org.http4s.server.Server +import org.http4s.server.middleware.{HSTS, Metrics} +import org.http4s.{HttpApp, HttpRoutes} +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import java.net.InetSocketAddress +import javax.net.ssl.SSLContext + + +/** The same http server builder as in collector but with option to customize SSLContext */ +object MicroHttpServer { + + implicit private def logger: Logger[IO] = Slf4jLogger.getLogger[IO] + + def build(routes: HttpRoutes[IO], + port: Int, + secure: Boolean, + customSslContext: Option[SSLContext], + hsts: CollectorConfig.HSTS, + networking: CollectorConfig.Networking, + metricsCollectorConfig: CollectorConfig.Metrics + ): Resource[IO, Server] = + for { + withMetricsMiddleware <- createMetricsMiddleware(routes, metricsCollectorConfig) + server <- buildBlazeServer(withMetricsMiddleware, port, secure, customSslContext, hsts, networking) + } yield server + + private def createMetricsMiddleware( + routes: HttpRoutes[IO], + metricsCollectorConfig: CollectorConfig.Metrics + ): Resource[IO, HttpRoutes[IO]] = + if (metricsCollectorConfig.statsd.enabled) { + val metricsFactory = StatsDMetricFactory.make[IO](createStatsdCollectorConfig(metricsCollectorConfig)) + metricsFactory.evalMap(DatadogMetricsOps.builder[IO](_).useDistributionBasedTimers().build()).map { metricsOps => + Metrics[IO](metricsOps)(routes) + } + } else { + Resource.pure(routes) + } + + private def createStatsdCollectorConfig(metricsCollectorConfig: CollectorConfig.Metrics): StatsDMetricFactoryConfig = { + val server = InetSocketAddress.createUnresolved(metricsCollectorConfig.statsd.hostname, metricsCollectorConfig.statsd.port) + val tags = metricsCollectorConfig.statsd.tags.toVector.map { case (name, value) => Tag.of(name, value) } + StatsDMetricFactoryConfig(Some(metricsCollectorConfig.statsd.prefix), server, defaultTags = tags) + } + + private def hstsMiddleware(hsts: CollectorConfig.HSTS, routes: HttpApp[IO]): HttpApp[IO] = + if (hsts.enable) + HSTS(routes, `Strict-Transport-Security`.unsafeFromDuration(hsts.maxAge)) + else routes + + private def buildBlazeServer(routes: HttpRoutes[IO], + port: Int, + secure: Boolean, + customSslContext: Option[SSLContext], + hsts: CollectorConfig.HSTS, + networking: CollectorConfig.Networking + ): Resource[IO, Server] = + Resource.eval(Logger[IO].info("Building blaze server")) >> + BlazeServerBuilder[IO] + .bindSocketAddress(new InetSocketAddress(port)) + .withHttpApp(hstsMiddleware(hsts, routes.orNotFound)) + .withIdleTimeout(networking.idleTimeout) + .withMaxConnections(networking.maxConnections) + .cond(secure, _.withSslContext(customSslContext.getOrElse(SSLContext.getDefault))) + .resource + + implicit class ConditionalAction[A](item: A) { + def cond(cond: Boolean, action: A => A): A = + if (cond) action(item) else item + } +} diff --git a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala index 9bf5329..b42120d 100644 --- a/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala +++ b/src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala @@ -53,10 +53,10 @@ object Run { private def buildEnvironment(config: MicroConfig): Resource[IO, Unit] = { for { - _ <- Resource.eval(setupSSLContext()) + customSslContext <- Resource.eval(setupSSLContext()) enrichmentRegistry <- buildEnrichmentRegistry(config.enrichmentsConfig) badProcessor = Processor(BuildInfo.name, BuildInfo.version) - adapterRegistry = MicroAdapterRegistry.create() + adapterRegistry = MicroAdapterRegistry.create() lookup = JavaNetRegistryLookup.ioLookupInstance[IO] sink = new MemorySink(config.iglu.client, lookup, enrichmentRegistry, config.outputEnrichedTsv, badProcessor, adapterRegistry) collectorService = new Service[IO]( @@ -73,20 +73,21 @@ object Run { miniRoutes = new Routing(config.iglu.resolver)(lookup).value allRoutes = miniRoutes <+> collectorRoutes - _ <- HttpServer.build[IO]( + _ <- MicroHttpServer.build( allRoutes, config.collector.port, secure = false, + customSslContext = None, config.collector.hsts, config.collector.networking, config.collector.monitoring.metrics ) - _ <- runHttpsServerIfEnabled(config, allRoutes) + _ <- runHttpsServerIfEnabled(config, customSslContext, allRoutes) } yield () } - private def setupSSLContext(): IO[Unit] = IO { - sys.env.get(Configuration.EnvironmentVariables.sslCertificatePassword).foreach { password => + private def setupSSLContext(): IO[Option[SSLContext]] = IO { + sys.env.get(Configuration.EnvironmentVariables.sslCertificatePassword).map { password => // Adapted from https://doc.akka.io/docs/akka-http/current/server-side/server-https-support.html. // We could use SSLContext.getDefault instead of all of this, but then we would need to // force the user to add arcane -D flags when running Micro, which is not the best experience. @@ -102,8 +103,7 @@ object Run { val context = SSLContext.getInstance("TLS") context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom) - - SSLContext.setDefault(context) + context } } @@ -138,12 +138,15 @@ object Run { } } - private def runHttpsServerIfEnabled(config: MicroConfig, routes: HttpRoutes[IO]): Resource[IO, Unit] = { + private def runHttpsServerIfEnabled(config: MicroConfig, + customSslContext: Option[SSLContext], + routes: HttpRoutes[IO]): Resource[IO, Unit] = { if (config.collector.ssl.enable) { - HttpServer.build[IO]( + MicroHttpServer.build( routes, config.collector.ssl.port, secure = true, + customSslContext, config.collector.hsts, config.collector.networking, config.collector.monitoring.metrics