diff --git a/http4s/src/main/resources/reference.conf b/http4s/src/main/resources/reference.conf index 9ae8f6849..929d36685 100644 --- a/http4s/src/main/resources/reference.conf +++ b/http4s/src/main/resources/reference.conf @@ -86,6 +86,11 @@ port = 443 } + networking { + maxConnections = 1024 + idleTimeout = 610 seconds + } + enableDefaultRedirect = false preTerminationPeriod = 10 seconds diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala index 62e4c0d07..cf5bacdcb 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Config.scala @@ -3,6 +3,7 @@ package com.snowplowanalytics.snowplow.collector.core import scala.concurrent.duration._ import io.circe.config.syntax._ + import io.circe.generic.semiauto._ import io.circe.Decoder import io.circe._ @@ -25,6 +26,7 @@ case class Config[+SinkConfig]( monitoring: Config.Monitoring, telemetry: Config.Telemetry, ssl: Config.SSL, + networking: Config.Networking, enableDefaultRedirect: Boolean, redirectDomains: Set[String], preTerminationPeriod: FiniteDuration @@ -140,6 +142,11 @@ object Config { autoGeneratedId: Option[String] ) + case class Networking( + maxConnections: Int, + idleTimeout: FiniteDuration + ) + implicit def decoder[SinkConfig: Decoder]: Decoder[Config[SinkConfig]] = { implicit val p3p = deriveDecoder[P3P] implicit val crossDomain = deriveDecoder[CrossDomain] @@ -166,6 +173,7 @@ object Config { implicit val monitoring = deriveDecoder[Monitoring] implicit val ssl = deriveDecoder[SSL] implicit val telemetry = deriveDecoder[Telemetry] + implicit val networking = deriveDecoder[Networking] deriveDecoder[Config[SinkConfig]] } diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala index 7d0f76a8e..e62b7322f 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/HttpServer.scala @@ -8,8 +8,6 @@ import io.netty.handler.ssl._ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import scala.concurrent.duration.DurationLong - import com.comcast.ip4s.{IpAddress, Port} import cats.implicits._ @@ -33,12 +31,13 @@ object HttpServer { app: HttpApp[F], interface: String, port: Int, - secure: Boolean + secure: Boolean, + networking: Config.Networking ): Resource[F, Server] = sys.env.get("HTTP4S_BACKEND").map(_.toUpperCase()) match { - case Some("BLAZE") | None => buildBlazeServer[F](app, port, secure) - case Some("EMBER") => buildEmberServer[F](app, interface, port, secure) - case Some("NETTY") => buildNettyServer[F](app, port, secure) + case Some("BLAZE") | None => buildBlazeServer[F](app, port, secure, networking) + case Some("EMBER") => buildEmberServer[F](app, interface, port, secure, networking) + case Some("NETTY") => buildNettyServer[F](app, port, secure, networking) case Some(other) => throw new IllegalArgumentException(s"Unrecognized http4s backend $other") } @@ -46,7 +45,8 @@ object HttpServer { app: HttpApp[F], interface: String, port: Int, - secure: Boolean + secure: Boolean, + networking: Config.Networking ) = { implicit val network = Network.forAsync[F] Resource.eval(Logger[F].info("Building ember server")) >> @@ -55,7 +55,8 @@ object HttpServer { .withHost(IpAddress.fromString(interface).get) .withPort(Port.fromInt(port).get) .withHttpApp(app) - .withIdleTimeout(610.seconds) + .withIdleTimeout(networking.idleTimeout) + .withMaxConnections(networking.maxConnections) .cond(secure, _.withTLS(TLSContext.Builder.forAsync.fromSSLContext(SSLContext.getDefault))) .build } @@ -63,26 +64,29 @@ object HttpServer { private def buildBlazeServer[F[_]: Async]( app: HttpApp[F], port: Int, - secure: Boolean + secure: Boolean, + networking: Config.Networking ): Resource[F, Server] = Resource.eval(Logger[F].info("Building blaze server")) >> BlazeServerBuilder[F] .bindSocketAddress(new InetSocketAddress(port)) .withHttpApp(app) - .withIdleTimeout(610.seconds) + .withIdleTimeout(networking.idleTimeout) + .withMaxConnections(networking.maxConnections) .cond(secure, _.withSslContext(SSLContext.getDefault)) .resource private def buildNettyServer[F[_]: Async]( app: HttpApp[F], port: Int, - secure: Boolean + secure: Boolean, + networking: Config.Networking ) = Resource.eval(Logger[F].info("Building netty server")) >> NettyServerBuilder[F] .bindLocal(port) .withHttpApp(app) - .withIdleTimeout(610.seconds) + .withIdleTimeout(networking.idleTimeout) .cond( secure, _.withSslContext( diff --git a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala index 20bed625b..944785107 100644 --- a/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala +++ b/http4s/src/main/scala/com.snowplowanalytics.snowplow.collector.core/Run.scala @@ -74,7 +74,8 @@ object Run { new Routes[F](config.enableDefaultRedirect, collectorService).value, config.interface, if (config.ssl.enable) config.ssl.port else config.port, - config.ssl.enable + config.ssl.enable, + config.networking ) _ <- withGracefulShutdown(config.preTerminationPeriod)(httpServer) httpClient <- BlazeClientBuilder[F].resource diff --git a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala index c465521ce..647871ee4 100644 --- a/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala +++ b/http4s/src/test/scala/com.snowplowanalytics.snowplow.collector.core/TestUtils.scala @@ -101,6 +101,10 @@ object TestUtils { false, 443 ), + networking = Networking( + 1024, + 610.seconds + ), enableDefaultRedirect = false, redirectDomains = Set.empty[String], preTerminationPeriod = 10.seconds, diff --git a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala index 24bc9a288..703fd1563 100644 --- a/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala +++ b/kafka/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/KafkaConfigSpec.scala @@ -134,6 +134,10 @@ object KafkaConfigSpec { moduleVersion = None, instanceId = None, autoGeneratedId = None + ), + networking = Config.Networking( + maxConnections = 1024, + idleTimeout = 610.seconds ) ) } diff --git a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala index bf97c260b..3196fc84b 100644 --- a/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala +++ b/kinesis/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/sinks/KinesisConfigSpec.scala @@ -110,6 +110,10 @@ object KinesisConfigSpec { enableDefaultRedirect = false, redirectDomains = Set.empty, preTerminationPeriod = 10.seconds, + networking = Config.Networking( + maxConnections = 1024, + idleTimeout = 610.seconds + ), streams = Config.Streams( good = "good", bad = "bad", diff --git a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala index 7c64c3e6a..6068295a2 100644 --- a/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala +++ b/nsq/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/NsqConfigSpec.scala @@ -133,6 +133,10 @@ object NsqConfigSpec { moduleVersion = None, instanceId = None, autoGeneratedId = None + ), + networking = Config.Networking( + maxConnections = 1024, + idleTimeout = 610.seconds ) ) } diff --git a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala index fbc2f4ae9..c3fc77eee 100644 --- a/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala +++ b/pubsub/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/ConfigSpec.scala @@ -95,6 +95,10 @@ object ConfigSpec { enableDefaultRedirect = false, redirectDomains = Set.empty, preTerminationPeriod = 10.seconds, + networking = Config.Networking( + maxConnections = 1024, + idleTimeout = 610.seconds + ), streams = Config.Streams( good = "good", bad = "bad", diff --git a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala index a9054d8c8..d02c99fe4 100644 --- a/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala +++ b/sqs/src/test/scala/com.snowplowanalytics.snowplow.collectors.scalastream/SqsConfigSpec.scala @@ -106,6 +106,10 @@ object SqsConfigSpec { enableDefaultRedirect = false, redirectDomains = Set.empty, preTerminationPeriod = 10.seconds, + networking = Config.Networking( + maxConnections = 1024, + idleTimeout = 610.seconds + ), streams = Config.Streams( good = "good", bad = "bad",