diff --git a/blaze-client/src/main/scala/org/http4s/blaze/client/BlazeClientBuilder.scala b/blaze-client/src/main/scala/org/http4s/blaze/client/BlazeClientBuilder.scala index 6dd2c4e79..602003943 100644 --- a/blaze-client/src/main/scala/org/http4s/blaze/client/BlazeClientBuilder.scala +++ b/blaze-client/src/main/scala/org/http4s/blaze/client/BlazeClientBuilder.scala @@ -68,6 +68,7 @@ import scala.concurrent.duration._ * @param maxIdleDuration maximum time a connection can be idle and still * be borrowed. Helps deal with connections that are closed while * idling in the pool for an extended period. + * @param maxBorrowDuration the maximum time for connection borrowing */ sealed abstract class BlazeClientBuilder[F[_]] private ( val responseHeaderTimeout: Duration, @@ -93,6 +94,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private ( val customDnsResolver: Option[RequestKey => Either[Throwable, InetSocketAddress]], val retries: Int, val maxIdleDuration: Duration, + val maxBorrowDuration: Duration, )(implicit protected val F: Async[F]) extends BlazeBackendBuilder[Client[F]] with BackendBuilder[F, Client[F]] { @@ -148,6 +150,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private ( customDnsResolver = customDnsResolver, retries = 0, maxIdleDuration = Duration.Inf, + maxBorrowDuration = Duration.Inf, )(F) private def copy( @@ -175,6 +178,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private ( customDnsResolver, retries: Int = retries, maxIdleDuration: Duration = maxIdleDuration, + maxBorrowDuration: Duration = maxBorrowDuration, ): BlazeClientBuilder[F] = new BlazeClientBuilder[F]( responseHeaderTimeout = responseHeaderTimeout, @@ -200,6 +204,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private ( customDnsResolver = customDnsResolver, retries = retries, maxIdleDuration = maxIdleDuration, + maxBorrowDuration = maxBorrowDuration, ) {} @deprecated( @@ -434,6 +439,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private ( requestTimeout = requestTimeout, executionContext = executionContext, maxIdleDuration = maxIdleDuration, + maxBorrowDuration = maxBorrowDuration, ) ) )(_.shutdown) @@ -467,6 +473,7 @@ object BlazeClientBuilder { customDnsResolver = None, retries = 2, maxIdleDuration = Duration.Inf, + maxBorrowDuration = Duration.Inf, ) {} @deprecated( diff --git a/blaze-client/src/main/scala/org/http4s/blaze/client/ConnectionManager.scala b/blaze-client/src/main/scala/org/http4s/blaze/client/ConnectionManager.scala index 2f5ada5e1..7a24a1a96 100644 --- a/blaze-client/src/main/scala/org/http4s/blaze/client/ConnectionManager.scala +++ b/blaze-client/src/main/scala/org/http4s/blaze/client/ConnectionManager.scala @@ -77,6 +77,7 @@ private object ConnectionManager { * @param maxWaitQueueLimit maximum number requests waiting for a connection at any specific time * @param maxConnectionsPerRequestKey Map of RequestKey to number of max connections * @param executionContext `ExecutionContext` where async operations will execute + * @param maxBorrowDuration the maximum time for connection borrowing */ def pool[F[_]: Async, A <: Connection[F]]( builder: ConnectionBuilder[F, A], @@ -87,6 +88,7 @@ private object ConnectionManager { requestTimeout: Duration, executionContext: ExecutionContext, maxIdleDuration: Duration, + maxBorrowDuration: Duration, ): F[ConnectionManager.Stateful[F, A]] = Semaphore(1).map { semaphore => new PoolManager[F, A]( @@ -99,6 +101,7 @@ private object ConnectionManager { semaphore, executionContext, maxIdleDuration, + maxBorrowDuration, ) } @@ -121,5 +124,6 @@ private object ConnectionManager { requestTimeout, executionContext, Duration.Inf, + Duration.Inf, ) } diff --git a/blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala b/blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala index d2615abac..e92960cfe 100644 --- a/blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala +++ b/blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala @@ -37,7 +37,8 @@ final case class WaitQueueFullFailure() extends RuntimeException { } /** @param maxIdleDuration the maximum time a connection can be idle - * and still be borrowed + * and still be borrowed + * @param maxBorrowDuration the maximum time for connection borrowing */ private final class PoolManager[F[_], A <: Connection[F]]( builder: ConnectionBuilder[F, A], @@ -49,6 +50,7 @@ private final class PoolManager[F[_], A <: Connection[F]]( semaphore: Semaphore[F], implicit private val executionContext: ExecutionContext, maxIdleDuration: Duration, + maxBorrowDuration: Duration, )(implicit F: Async[F]) extends ConnectionManager.Stateful[F, A] { self => @@ -73,6 +75,7 @@ private final class PoolManager[F[_], A <: Connection[F]]( semaphore, executionContext, Duration.Inf, + Duration.Inf, )(F) private sealed case class PooledConnection(conn: A, borrowDeadline: Option[Deadline]) @@ -184,12 +187,14 @@ private final class PoolManager[F[_], A <: Connection[F]]( /** This generates a effect of Next Connection. The following calls are executed asynchronously * with respect to whenever the execution of this task can occur. + * If borrowing takes more than [[maxBorrowDuration]] (if it's a finite duration) then + * [[ConnectionBorrowingException]] will be thrown. * * If the pool is closed the effect failure is executed. * * If the pool is not closed then we look for any connections in the idleQueues that match * the RequestKey requested. - * If a matching connection exists and it is stil open the callback is executed with the connection. + * If a matching connection exists and it is still open the callback is executed with the connection. * If a matching connection is closed we deallocate and repeat the check through the idleQueues. * If no matching connection is found, and the pool is not full we create a new Connection to perform * the request. @@ -201,78 +206,100 @@ private final class PoolManager[F[_], A <: Connection[F]]( * @param key The Request Key For The Connection * @return An effect of NextConnection */ - def borrow(key: RequestKey): F[NextConnection] = - F.async { callback => - semaphore.permit.use { _ => - if (!isClosed) { - def go(): F[Unit] = - getConnectionFromQueue(key).flatMap { - case Some(pooled) if pooled.conn.isClosed => - F.delay(logger.debug(s"Evicting closed connection for $key: $stats")) *> - decrConnection(key) *> - go() - - case Some(pooled) if pooled.borrowDeadline.exists(_.isOverdue()) => - F.delay( - logger.debug(s"Shutting down and evicting expired connection for $key: $stats") - ) *> - decrConnection(key) *> - F.delay(pooled.conn.shutdown()) *> - go() - - case Some(pooled) => - F.delay(logger.debug(s"Recycling connection for $key: $stats")) *> - F.delay(callback(Right(NextConnection(pooled.conn, fresh = false)))) - - case None if numConnectionsCheckHolds(key) => - F.delay( - logger.debug(s"Active connection not found for $key. Creating new one. $stats") - ) *> - createConnection(key, callback) - - case None if maxConnectionsPerRequestKey(key) <= 0 => - F.delay(callback(Left(NoConnectionAllowedException(key)))) - - case None if curTotal == maxTotal => - val keys = idleQueues.keys - if (keys.nonEmpty) + def borrow(key: RequestKey): F[NextConnection] = { + val connectionBorrowF = + F.async[NextConnection] { callback => + semaphore.permit.use { _ => + if (!isClosed) { + def go(): F[Unit] = + getConnectionFromQueue(key).flatMap { + case Some(pooled) if pooled.conn.isClosed => + F.delay(logger.debug(s"Evicting closed connection for $key: $stats")) *> + decrConnection(key) *> + go() + + case Some(pooled) if pooled.borrowDeadline.exists(_.isOverdue()) => + F.delay( + logger.debug( + s"Shutting down and evicting expired connection for $key: $stats" + ) + ) *> + decrConnection(key) *> + F.delay(pooled.conn.shutdown()) *> + go() + + case Some(pooled) => + F.delay(logger.debug(s"Recycling connection for $key: $stats")) *> + F.delay(callback(Right(NextConnection(pooled.conn, fresh = false)))) + + case None if numConnectionsCheckHolds(key) => F.delay( logger.debug( - s"No connections available for the desired key, $key. Evicting random and creating a new connection: $stats" + s"Active connection not found for $key. Creating new one. $stats" ) ) *> - F.delay(keys.iterator.drop(Random.nextInt(keys.size)).next()).flatMap { - randKey => - getConnectionFromQueue(randKey).map( - _.fold( - logger.warn(s"No connection to evict from the idleQueue for $randKey") - )(_.conn.shutdown()) - ) *> - decrConnection(randKey) - } *> createConnection(key, callback) - else + + case None if maxConnectionsPerRequestKey(key) <= 0 => + F.delay(callback(Left(NoConnectionAllowedException(key)))) + + case None if curTotal == maxTotal => + val keys = idleQueues.keys + if (keys.nonEmpty) + F.delay( + logger.debug( + s"No connections available for the desired key, $key. Evicting random and creating a new connection: $stats" + ) + ) *> + F.delay(keys.iterator.drop(Random.nextInt(keys.size)).next()) + .flatMap { randKey => + getConnectionFromQueue(randKey).map( + _.fold( + logger + .warn( + s"No connection to evict from the idleQueue for $randKey" + ) + )(_.conn.shutdown()) + ) *> + decrConnection(randKey) + } *> + createConnection(key, callback) + else + F.delay( + logger.debug( + s"No connections available for the desired key, $key. Adding to waitQueue: $stats" + ) + ) *> + addToWaitQueue(key, callback) + + case None => // we're full up. Add to waiting queue. F.delay( logger.debug( - s"No connections available for the desired key, $key. Adding to waitQueue: $stats" + s"No connections available for $key. Waiting on new connection: $stats" ) ) *> addToWaitQueue(key, callback) - - case None => // we're full up. Add to waiting queue. - F.delay( - logger.debug( - s"No connections available for $key. Waiting on new connection: $stats" - ) - ) *> - addToWaitQueue(key, callback) - } - - F.delay(logger.debug(s"Requesting connection for $key: $stats")).productR(go()).as(None) - } else - F.delay(callback(Left(new IllegalStateException("Connection pool is closed")))).as(None) + } + + F.delay(logger.debug(s"Requesting connection for $key: $stats")) + .productR(go()) + .as(None) + } else + F.delay(callback(Left(new IllegalStateException("Connection pool is closed")))) + .as(None) + } } + + maxBorrowDuration match { + case d: FiniteDuration => + connectionBorrowF.timeoutTo( + d, + F.defer(F.raiseError(ConnectionBorrowingException(key))), + ) + case _ => + connectionBorrowF } + } private def releaseRecyclable(key: RequestKey, connection: A): F[Unit] = F.delay(waitQueue.dequeueFirst(_.key == key)).flatMap { @@ -451,3 +478,6 @@ private final class PoolManager[F[_], A <: Connection[F]]( final case class NoConnectionAllowedException(key: RequestKey) extends IllegalArgumentException(s"No client connections allowed to $key") + +final case class ConnectionBorrowingException(key: RequestKey) + extends IllegalStateException(s"Requesting connection for $key has exceeded the timeout") diff --git a/blaze-client/src/test/scala/org/http4s/blaze/client/PoolManagerSuite.scala b/blaze-client/src/test/scala/org/http4s/blaze/client/PoolManagerSuite.scala index 05f5e5398..60f6778e5 100644 --- a/blaze-client/src/test/scala/org/http4s/blaze/client/PoolManagerSuite.scala +++ b/blaze-client/src/test/scala/org/http4s/blaze/client/PoolManagerSuite.scala @@ -51,6 +51,7 @@ class PoolManagerSuite extends CatsEffectSuite with AllSyntax { requestTimeout: Duration = Duration.Inf, builder: ConnectionBuilder[IO, TestConnection] = _ => IO(new TestConnection()), maxIdleDuration: Duration = Duration.Inf, + maxBorrowDuration: Duration = Duration.Inf, ) = ConnectionManager.pool( builder = builder, @@ -61,6 +62,7 @@ class PoolManagerSuite extends CatsEffectSuite with AllSyntax { requestTimeout = requestTimeout, executionContext = ExecutionContext.Implicits.global, maxIdleDuration = maxIdleDuration, + maxBorrowDuration = maxBorrowDuration, ) test("A pool manager should wait up to maxWaitQueueLimit") { @@ -262,4 +264,42 @@ class PoolManagerSuite extends CatsEffectSuite with AllSyntax { _ <- pool.borrow(key) } yield assert(conn1.connection.isClosed) } + + test("Should not borrow connection if it exceeds the maxBorrowDuration") { + val connectionBuilder: ConnectionBuilder[IO, TestConnection] = + _ => IO.sleep(50.milliseconds) *> IO(new TestConnection()) + + for { + pool <- mkPool( + maxTotal = 1, + builder = connectionBuilder, + maxIdleDuration = Duration.Inf, + maxBorrowDuration = 10.milliseconds, + ) + _ <- pool + .borrow(key) + .as(false) + .recover { case ConnectionBorrowingException(_) => + true + } + .assert + } yield () + } + + test("Should borrow connection if it doesn't exceed the maxBorrowDuration") { + for { + pool <- mkPool( + maxTotal = 1, + maxIdleDuration = Duration.Inf, + maxBorrowDuration = 2.seconds, + ) + _ <- pool + .borrow(key) + .as(true) + .recover { case ConnectionBorrowingException(_) => + false + } + .assert + } yield () + } } diff --git a/build.sbt b/build.sbt index fd6cc48ae..3f6309350 100644 --- a/build.sbt +++ b/build.sbt @@ -241,6 +241,8 @@ lazy val blazeClient = project ), mimaBinaryIssueFilters ++= Seq( // private constructor + ProblemFilters + .exclude[DirectMissingMethodProblem]("org.http4s.blaze.client.BlazeClientBuilder.this"), ProblemFilters .exclude[IncompatibleMethTypeProblem]("org.http4s.blaze.client.BlazeClientBuilder.this"), ProblemFilters @@ -284,12 +286,16 @@ lazy val blazeClient = project .exclude[IncompatibleMethTypeProblem]("org.http4s.blaze.client.PoolManager.invalidate"), ProblemFilters .exclude[IncompatibleMethTypeProblem]("org.http4s.blaze.client.BasicManager.this"), + ProblemFilters + .exclude[DirectMissingMethodProblem]("org.http4s.blaze.client.ConnectionManager.pool"), ProblemFilters .exclude[IncompatibleMethTypeProblem]("org.http4s.blaze.client.ConnectionManager.pool"), ProblemFilters .exclude[IncompatibleMethTypeProblem]("org.http4s.blaze.client.ConnectionManager.basic"), ProblemFilters .exclude[IncompatibleMethTypeProblem]("org.http4s.blaze.client.PoolManager.this"), + ProblemFilters + .exclude[DirectMissingMethodProblem]("org.http4s.blaze.client.PoolManager.this"), // inside private trait/clas/object ProblemFilters .exclude[DirectMissingMethodProblem]("org.http4s.blaze.client.BlazeConnection.runRequest"),