diff --git a/build.sbt b/build.sbt index dc020e98c..c80872d35 100644 --- a/build.sbt +++ b/build.sbt @@ -22,6 +22,7 @@ lazy val core = project Dependencies.Libraries.http4sBlaze, Dependencies.Libraries.http4sEmber, Dependencies.Libraries.http4sNetty, + Dependencies.Libraries.http4sArmeria, Dependencies.Libraries.http4sClient, Dependencies.Libraries.log4cats, Dependencies.Libraries.thrift, diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala index ce6d01652..bdce3703d 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala @@ -183,6 +183,7 @@ object Config { case object Blaze extends Backend case object Ember extends Backend case object Netty extends Backend + case object Armeria extends Backend } } @@ -223,10 +224,11 @@ object Config { implicit val sinkConfig = newDecoder[SinkConfig].or(legacyDecoder[SinkConfig]) implicit val streams = deriveDecoder[Streams[SinkConfig]] implicit val backend: Decoder[Experimental.Backend] = Decoder[String].emap { - case s if s.toLowerCase() == "blaze" => Right(Experimental.Backend.Blaze) - case s if s.toLowerCase() == "ember" => Right(Experimental.Backend.Ember) - case s if s.toLowerCase() == "netty" => Right(Experimental.Backend.Netty) - case other => Left(s"Invalid backend $other") + case s if s.toLowerCase() == "blaze" => Right(Experimental.Backend.Blaze) + case s if s.toLowerCase() == "ember" => Right(Experimental.Backend.Ember) + case s if s.toLowerCase() == "netty" => Right(Experimental.Backend.Netty) + case s if s.toLowerCase() == "armeria" => Right(Experimental.Backend.Armeria) + case other => Left(s"Invalid backend $other") } implicit val experimental = deriveDecoder[Experimental] diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala index 62d6891b8..509bf1bc8 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala @@ -19,6 +19,7 @@ import org.http4s.{HttpApp, HttpRoutes} import org.http4s.blaze.server.BlazeServerBuilder import org.http4s.ember.server.EmberServerBuilder import org.http4s.netty.server.NettyServerBuilder +import org.http4s.armeria.server.ArmeriaServerBuilder import com.comcast.ip4s._ import fs2.io.net.Network import fs2.io.net.tls.TLSContext @@ -31,6 +32,13 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import java.net.InetSocketAddress import javax.net.ssl.SSLContext +import io.netty.handler.ssl.{ClientAuth, JdkSslContext} +import io.netty.handler.ssl.IdentityCipherSuiteFilter +import io.netty.handler.ssl.ApplicationProtocolConfig +import java.util.Properties +import javax.net.ssl.KeyManagerFactory +import java.security.KeyStore +import java.io.FileInputStream object HttpServer { @@ -72,8 +80,10 @@ object HttpServer { networking: Config.Networking, debugHttp: Config.Debug.Http ) = backend match { - case Config.Experimental.Backend.Ember => buildEmberServer(routes, port, secure, hsts, networking, debugHttp) - case Config.Experimental.Backend.Blaze => buildBlazeServer(routes, port, secure, hsts, networking, debugHttp) + case Config.Experimental.Backend.Ember => buildEmberServer(routes, port, secure, hsts, networking, debugHttp) + case Config.Experimental.Backend.Blaze => buildBlazeServer(routes, port, secure, hsts, networking, debugHttp) + case Config.Experimental.Backend.Netty => buildNettyServer(routes, port, secure, hsts, networking, debugHttp) + case Config.Experimental.Backend.Armeria => buildArmeriaServer(routes, port, secure, hsts, networking, debugHttp) } private def createStatsdConfig(metricsConfig: Config.Metrics): StatsDMetricFactoryConfig = { val server = InetSocketAddress.createUnresolved(metricsConfig.statsd.hostname, metricsConfig.statsd.port) @@ -145,7 +155,7 @@ object HttpServer { .build } - private def buildNettyServer[F[_]: Async: Network]( + private def buildNettyServer[F[_]: Async]( routes: HttpRoutes[F], port: Int, secure: Boolean, @@ -153,19 +163,84 @@ object HttpServer { networking: Config.Networking, debugHttp: Config.Debug.Http ): Resource[F, Server] = - Resource.eval(TLSContext.Builder.forAsync[F].system).flatMap { tls => - Resource.eval(Logger[F].info("Building netty server")) >> - NettyServerBuilder[F] - .bindHttp(port) - .withHttpApp( - loggerMiddleware(timeoutMiddleware(hstsMiddleware(hsts, routes.orNotFound), networking), debugHttp) - ) + Resource.eval(Logger[F].info(s"Building netty server $secure $port")) >> + NettyServerBuilder[F] + .bindHttp(port) + .withHttpApp( + loggerMiddleware(timeoutMiddleware(hstsMiddleware(hsts, routes.orNotFound), networking), debugHttp) + ) .withIdleTimeout(networking.idleTimeout) .withMaxInitialLineLength(networking.maxRequestLineLength) - .cond(secure, _.withSslContext(SSLContext.getDefault)) - .build + .cond( + secure, + _.withSslContext( + sslContext = new JdkSslContext( + SSLContext.getDefault, + false, + null, + IdentityCipherSuiteFilter.INSTANCE_DEFAULTING_TO_SUPPORTED_CIPHERS, + ApplicationProtocolConfig.DISABLED, + ClientAuth.OPTIONAL, + null, + false + ) + ) + ) + .resource + + private def buildArmeriaServer[F[_]: Async]( + routes: HttpRoutes[F], + port: Int, + secure: Boolean, + hsts: Config.HSTS, + networking: Config.Networking, + debugHttp: Config.Debug.Http + ): Resource[F, Server] = { + case class ArmeriaTlsConfig private (ksType: String, ksPath: String, ksPass: String) + object ArmeriaTlsConfig { + def from( + props: Properties + ): F[ArmeriaTlsConfig] = + (for { + t <- Option(props.getProperty("javax.net.ssl.keyStoreType")) + cert <- Option(props.getProperty("javax.net.ssl.keyStore")) + pass <- Option(props.getProperty("javax.net.ssl.keyStorePassword")) + } yield Async[F].delay(ArmeriaTlsConfig(t, cert, pass))).getOrElse( + Async[F].raiseError( + new IllegalStateException( + "Invalid SSL configuration. Missing required JSSE options. See: https://docs.snowplow.io/docs/pipeline-components-and-applications/stream-collector/configure/#tls-port-binding-and-certificate-240" + ) + ) + ) } + def mkTls(secure: Boolean): Resource[F, KeyManagerFactory] = + if (secure) { + for { + tlsConfig <- Resource.eval(ArmeriaTlsConfig.from(System.getProperties())) //FIXME make conditional + kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + ks = KeyStore.getInstance(tlsConfig.ksType) + _ <- Resource.eval(Async[F].delay(ks.load(new FileInputStream(tlsConfig.ksPath), tlsConfig.ksPass.toArray))) + _ <- Resource.eval(Async[F].delay(kmf.init(ks, tlsConfig.ksPass.toArray))) + } yield kmf + } else Resource.never + + for { + _ <- Resource.eval(Logger[F].info(s"Building netty server")) + kmf <- mkTls(secure) + server <- ArmeriaServerBuilder[F] + .withHttp(port) + .withHttpApp( + "/", + loggerMiddleware(timeoutMiddleware(hstsMiddleware(hsts, routes.orNotFound), networking), debugHttp) + ) + .cond(secure, _.withTls(kmf)) + .withIdleTimeout(networking.idleTimeout) + .withRequestTimeout(networking.responseHeaderTimeout) + .resource + } yield server + } + implicit class ConditionalAction[A](item: A) { def cond(cond: Boolean, action: A => A): A = if (cond) action(item) else item diff --git a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala index 4786fed74..f61e6f248 100644 --- a/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala +++ b/core/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -82,7 +82,7 @@ object Run { ) } - private def fromConfig[F[_]: Async: Network: Tracking, SinkConfig]( + private def fromConfig[F[_]: Async: Network: Tracking, SinkConfig]( appInfo: AppInfo, mkSinks: MkSinks[F, SinkConfig], telemetryInfo: TelemetryInfo[F, SinkConfig], diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 22b76d411..89b2c3183 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -25,6 +25,7 @@ object Dependencies { val fs2PubSub = "0.22.0" val http4s = "0.23.23" val http4sNetty = "0.5.16" + val http4sArmeria = "0.5.3" val jackson = "2.12.7" // force this version to mitigate security vulnerabilities val fs2Kafka = "2.6.1" val log4cats = "2.6.0" @@ -56,6 +57,7 @@ object Dependencies { val decline = "com.monovore" %% "decline-effect" % V.decline val emitterHttps = "com.snowplowanalytics" %% "snowplow-scala-tracker-emitter-http4s" % V.tracker val http4sNetty = "org.http4s" %% "http4s-netty-server" % V.http4sNetty + val http4sArmeria = "org.http4s" %% "http4s-armeria-server" % V.http4sArmeria val http4sEmber = "org.http4s" %% "http4s-ember-server" % V.http4s val http4sBlaze = "org.http4s" %% "http4s-blaze-server" % V.blaze val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.blaze