Skip to content

Commit

Permalink
Make maxConnections and idleTimeout configurable
Browse files Browse the repository at this point in the history
Previously, idleTimeout has been hardcoded, maxConnections hasn't been
configured.
Now, these parameters are set within `networking` section and used throughout
http4s backends.
  • Loading branch information
peel authored and spenes committed Oct 16, 2023
1 parent 94a6cdb commit a509965
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 13 deletions.
5 changes: 5 additions & 0 deletions http4s/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
port = 443
}

networking {
maxConnections = 1024
idleTimeout = 610 seconds
}

enableDefaultRedirect = false
preTerminationPeriod = 10 seconds

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.snowplowanalytics.snowplow.collector.core
import scala.concurrent.duration._

import io.circe.config.syntax._

import io.circe.generic.semiauto._
import io.circe.Decoder
import io.circe._
Expand All @@ -25,6 +26,7 @@ case class Config[+SinkConfig](
monitoring: Config.Monitoring,
telemetry: Config.Telemetry,
ssl: Config.SSL,
networking: Config.Networking,
enableDefaultRedirect: Boolean,
redirectDomains: Set[String],
preTerminationPeriod: FiniteDuration
Expand Down Expand Up @@ -140,6 +142,11 @@ object Config {
autoGeneratedId: Option[String]
)

case class Networking(
maxConnections: Int,
idleTimeout: FiniteDuration
)

implicit def decoder[SinkConfig: Decoder]: Decoder[Config[SinkConfig]] = {
implicit val p3p = deriveDecoder[P3P]
implicit val crossDomain = deriveDecoder[CrossDomain]
Expand All @@ -166,6 +173,7 @@ object Config {
implicit val monitoring = deriveDecoder[Monitoring]
implicit val ssl = deriveDecoder[SSL]
implicit val telemetry = deriveDecoder[Telemetry]
implicit val networking = deriveDecoder[Networking]
deriveDecoder[Config[SinkConfig]]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import io.netty.handler.ssl._
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.concurrent.duration.DurationLong

import com.comcast.ip4s.{IpAddress, Port}

import cats.implicits._
Expand All @@ -33,20 +31,22 @@ object HttpServer {
app: HttpApp[F],
interface: String,
port: Int,
secure: Boolean
secure: Boolean,
networking: Config.Networking
): Resource[F, Server] =
sys.env.get("HTTP4S_BACKEND").map(_.toUpperCase()) match {
case Some("BLAZE") | None => buildBlazeServer[F](app, port, secure)
case Some("EMBER") => buildEmberServer[F](app, interface, port, secure)
case Some("NETTY") => buildNettyServer[F](app, port, secure)
case Some("BLAZE") | None => buildBlazeServer[F](app, port, secure, networking)
case Some("EMBER") => buildEmberServer[F](app, interface, port, secure, networking)
case Some("NETTY") => buildNettyServer[F](app, port, secure, networking)
case Some(other) => throw new IllegalArgumentException(s"Unrecognized http4s backend $other")
}

private def buildEmberServer[F[_]: Async](
app: HttpApp[F],
interface: String,
port: Int,
secure: Boolean
secure: Boolean,
networking: Config.Networking
) = {
implicit val network = Network.forAsync[F]
Resource.eval(Logger[F].info("Building ember server")) >>
Expand All @@ -55,34 +55,38 @@ object HttpServer {
.withHost(IpAddress.fromString(interface).get)
.withPort(Port.fromInt(port).get)
.withHttpApp(app)
.withIdleTimeout(610.seconds)
.withIdleTimeout(networking.idleTimeout)
.withMaxConnections(networking.maxConnections)
.cond(secure, _.withTLS(TLSContext.Builder.forAsync.fromSSLContext(SSLContext.getDefault)))
.build
}

private def buildBlazeServer[F[_]: Async](
app: HttpApp[F],
port: Int,
secure: Boolean
secure: Boolean,
networking: Config.Networking
): Resource[F, Server] =
Resource.eval(Logger[F].info("Building blaze server")) >>
BlazeServerBuilder[F]
.bindSocketAddress(new InetSocketAddress(port))
.withHttpApp(app)
.withIdleTimeout(610.seconds)
.withIdleTimeout(networking.idleTimeout)
.withMaxConnections(networking.maxConnections)
.cond(secure, _.withSslContext(SSLContext.getDefault))
.resource

private def buildNettyServer[F[_]: Async](
app: HttpApp[F],
port: Int,
secure: Boolean
secure: Boolean,
networking: Config.Networking
) =
Resource.eval(Logger[F].info("Building netty server")) >>
NettyServerBuilder[F]
.bindLocal(port)
.withHttpApp(app)
.withIdleTimeout(610.seconds)
.withIdleTimeout(networking.idleTimeout)
.cond(
secure,
_.withSslContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ object Run {
new Routes[F](config.enableDefaultRedirect, collectorService).value,
config.interface,
if (config.ssl.enable) config.ssl.port else config.port,
config.ssl.enable
config.ssl.enable,
config.networking
)
_ <- withGracefulShutdown(config.preTerminationPeriod)(httpServer)
httpClient <- BlazeClientBuilder[F].resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ object TestUtils {
false,
443
),
networking = Networking(
1024,
610.seconds
),
enableDefaultRedirect = false,
redirectDomains = Set.empty[String],
preTerminationPeriod = 10.seconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ object KafkaConfigSpec {
moduleVersion = None,
instanceId = None,
autoGeneratedId = None
),
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ object KinesisConfigSpec {
enableDefaultRedirect = false,
redirectDomains = Set.empty,
preTerminationPeriod = 10.seconds,
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
),
streams = Config.Streams(
good = "good",
bad = "bad",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ object NsqConfigSpec {
moduleVersion = None,
instanceId = None,
autoGeneratedId = None
),
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ object ConfigSpec {
enableDefaultRedirect = false,
redirectDomains = Set.empty,
preTerminationPeriod = 10.seconds,
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
),
streams = Config.Streams(
good = "good",
bad = "bad",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ object SqsConfigSpec {
enableDefaultRedirect = false,
redirectDomains = Set.empty,
preTerminationPeriod = 10.seconds,
networking = Config.Networking(
maxConnections = 1024,
idleTimeout = 610.seconds
),
streams = Config.Streams(
good = "good",
bad = "bad",
Expand Down

0 comments on commit a509965

Please sign in to comment.