Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max wait for connection borrowing in pool manager #681

Open
wants to merge 1 commit into
base: series/0.23
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import scala.concurrent.duration._
* @param maxIdleDuration maximum time a connection can be idle and still
* be borrowed. Helps deal with connections that are closed while
* idling in the pool for an extended period.
* @param maxBorrowDuration the maximum time for connection borrowing
*/
sealed abstract class BlazeClientBuilder[F[_]] private (
val responseHeaderTimeout: Duration,
Expand All @@ -93,6 +94,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
val customDnsResolver: Option[RequestKey => Either[Throwable, InetSocketAddress]],
val retries: Int,
val maxIdleDuration: Duration,
val maxBorrowDuration: Duration,
)(implicit protected val F: Async[F])
extends BlazeBackendBuilder[Client[F]]
with BackendBuilder[F, Client[F]] {
Expand Down Expand Up @@ -148,6 +150,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
customDnsResolver = customDnsResolver,
retries = 0,
maxIdleDuration = Duration.Inf,
maxBorrowDuration = Duration.Inf,
)(F)

private def copy(
Expand Down Expand Up @@ -175,6 +178,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
customDnsResolver,
retries: Int = retries,
maxIdleDuration: Duration = maxIdleDuration,
maxBorrowDuration: Duration = maxBorrowDuration,
): BlazeClientBuilder[F] =
new BlazeClientBuilder[F](
responseHeaderTimeout = responseHeaderTimeout,
Expand All @@ -200,6 +204,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
customDnsResolver = customDnsResolver,
retries = retries,
maxIdleDuration = maxIdleDuration,
maxBorrowDuration = maxBorrowDuration,
) {}

@deprecated(
Expand Down Expand Up @@ -434,6 +439,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
requestTimeout = requestTimeout,
executionContext = executionContext,
maxIdleDuration = maxIdleDuration,
maxBorrowDuration = maxBorrowDuration,
)
)
)(_.shutdown)
Expand Down Expand Up @@ -467,6 +473,7 @@ object BlazeClientBuilder {
customDnsResolver = None,
retries = 2,
maxIdleDuration = Duration.Inf,
maxBorrowDuration = Duration.Inf,
) {}

@deprecated(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ private object ConnectionManager {
* @param maxWaitQueueLimit maximum number requests waiting for a connection at any specific time
* @param maxConnectionsPerRequestKey Map of RequestKey to number of max connections
* @param executionContext `ExecutionContext` where async operations will execute
* @param maxBorrowDuration the maximum time for connection borrowing
*/
def pool[F[_]: Async, A <: Connection[F]](
builder: ConnectionBuilder[F, A],
Expand All @@ -87,6 +88,7 @@ private object ConnectionManager {
requestTimeout: Duration,
executionContext: ExecutionContext,
maxIdleDuration: Duration,
maxBorrowDuration: Duration,
): F[ConnectionManager.Stateful[F, A]] =
Semaphore(1).map { semaphore =>
new PoolManager[F, A](
Expand All @@ -99,6 +101,7 @@ private object ConnectionManager {
semaphore,
executionContext,
maxIdleDuration,
maxBorrowDuration,
)
}

Expand All @@ -121,5 +124,6 @@ private object ConnectionManager {
requestTimeout,
executionContext,
Duration.Inf,
Duration.Inf,
)
}
154 changes: 92 additions & 62 deletions blaze-client/src/main/scala/org/http4s/blaze/client/PoolManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ final case class WaitQueueFullFailure() extends RuntimeException {
}

/** @param maxIdleDuration the maximum time a connection can be idle
* and still be borrowed
* and still be borrowed
* @param maxBorrowDuration the maximum time for connection borrowing
*/
private final class PoolManager[F[_], A <: Connection[F]](
builder: ConnectionBuilder[F, A],
Expand All @@ -49,6 +50,7 @@ private final class PoolManager[F[_], A <: Connection[F]](
semaphore: Semaphore[F],
implicit private val executionContext: ExecutionContext,
maxIdleDuration: Duration,
maxBorrowDuration: Duration,
)(implicit F: Async[F])
extends ConnectionManager.Stateful[F, A] { self =>

Expand All @@ -73,6 +75,7 @@ private final class PoolManager[F[_], A <: Connection[F]](
semaphore,
executionContext,
Duration.Inf,
Duration.Inf,
)(F)

private sealed case class PooledConnection(conn: A, borrowDeadline: Option[Deadline])
Expand Down Expand Up @@ -184,12 +187,14 @@ private final class PoolManager[F[_], A <: Connection[F]](

/** This generates a effect of Next Connection. The following calls are executed asynchronously
* with respect to whenever the execution of this task can occur.
* If borrowing takes more than [[maxBorrowDuration]] (if it's a finite duration) then
* [[ConnectionBorrowingException]] will be thrown.
*
* If the pool is closed the effect failure is executed.
*
* If the pool is not closed then we look for any connections in the idleQueues that match
* the RequestKey requested.
* If a matching connection exists and it is stil open the callback is executed with the connection.
* If a matching connection exists and it is still open the callback is executed with the connection.
* If a matching connection is closed we deallocate and repeat the check through the idleQueues.
* If no matching connection is found, and the pool is not full we create a new Connection to perform
* the request.
Expand All @@ -201,78 +206,100 @@ private final class PoolManager[F[_], A <: Connection[F]](
* @param key The Request Key For The Connection
* @return An effect of NextConnection
*/
def borrow(key: RequestKey): F[NextConnection] =
F.async { callback =>
semaphore.permit.use { _ =>
if (!isClosed) {
def go(): F[Unit] =
getConnectionFromQueue(key).flatMap {
case Some(pooled) if pooled.conn.isClosed =>
F.delay(logger.debug(s"Evicting closed connection for $key: $stats")) *>
decrConnection(key) *>
go()

case Some(pooled) if pooled.borrowDeadline.exists(_.isOverdue()) =>
F.delay(
logger.debug(s"Shutting down and evicting expired connection for $key: $stats")
) *>
decrConnection(key) *>
F.delay(pooled.conn.shutdown()) *>
go()

case Some(pooled) =>
F.delay(logger.debug(s"Recycling connection for $key: $stats")) *>
F.delay(callback(Right(NextConnection(pooled.conn, fresh = false))))

case None if numConnectionsCheckHolds(key) =>
F.delay(
logger.debug(s"Active connection not found for $key. Creating new one. $stats")
) *>
createConnection(key, callback)

case None if maxConnectionsPerRequestKey(key) <= 0 =>
F.delay(callback(Left(NoConnectionAllowedException(key))))

case None if curTotal == maxTotal =>
val keys = idleQueues.keys
if (keys.nonEmpty)
def borrow(key: RequestKey): F[NextConnection] = {
val connectionBorrowF =
F.async[NextConnection] { callback =>
semaphore.permit.use { _ =>
if (!isClosed) {
def go(): F[Unit] =
getConnectionFromQueue(key).flatMap {
case Some(pooled) if pooled.conn.isClosed =>
F.delay(logger.debug(s"Evicting closed connection for $key: $stats")) *>
decrConnection(key) *>
go()

case Some(pooled) if pooled.borrowDeadline.exists(_.isOverdue()) =>
F.delay(
logger.debug(
s"Shutting down and evicting expired connection for $key: $stats"
)
) *>
decrConnection(key) *>
F.delay(pooled.conn.shutdown()) *>
go()

case Some(pooled) =>
F.delay(logger.debug(s"Recycling connection for $key: $stats")) *>
F.delay(callback(Right(NextConnection(pooled.conn, fresh = false))))

case None if numConnectionsCheckHolds(key) =>
F.delay(
logger.debug(
s"No connections available for the desired key, $key. Evicting random and creating a new connection: $stats"
s"Active connection not found for $key. Creating new one. $stats"
)
) *>
F.delay(keys.iterator.drop(Random.nextInt(keys.size)).next()).flatMap {
randKey =>
getConnectionFromQueue(randKey).map(
_.fold(
logger.warn(s"No connection to evict from the idleQueue for $randKey")
)(_.conn.shutdown())
) *>
decrConnection(randKey)
} *>
createConnection(key, callback)
else

case None if maxConnectionsPerRequestKey(key) <= 0 =>
F.delay(callback(Left(NoConnectionAllowedException(key))))

case None if curTotal == maxTotal =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a minor thing that comes to mind: what if we added a guard clause here like

Suggested change
case None if curTotal == maxTotal =>
case None if curTotal == maxTotal && idleQueues.keys.nonEmpty =>

so to expunge the else branch on line 267 on, and make the match branch on line 275 to become
case _ =>
to take care of that?

val keys = idleQueues.keys
if (keys.nonEmpty)
F.delay(
logger.debug(
s"No connections available for the desired key, $key. Evicting random and creating a new connection: $stats"
)
) *>
F.delay(keys.iterator.drop(Random.nextInt(keys.size)).next())
.flatMap { randKey =>
getConnectionFromQueue(randKey).map(
_.fold(
logger
.warn(
s"No connection to evict from the idleQueue for $randKey"
)
)(_.conn.shutdown())
) *>
decrConnection(randKey)
} *>
createConnection(key, callback)
else
F.delay(
logger.debug(
s"No connections available for the desired key, $key. Adding to waitQueue: $stats"
)
) *>
addToWaitQueue(key, callback)

case None => // we're full up. Add to waiting queue.
F.delay(
logger.debug(
s"No connections available for the desired key, $key. Adding to waitQueue: $stats"
s"No connections available for $key. Waiting on new connection: $stats"
)
) *>
addToWaitQueue(key, callback)

case None => // we're full up. Add to waiting queue.
F.delay(
logger.debug(
s"No connections available for $key. Waiting on new connection: $stats"
)
) *>
addToWaitQueue(key, callback)
}

F.delay(logger.debug(s"Requesting connection for $key: $stats")).productR(go()).as(None)
} else
F.delay(callback(Left(new IllegalStateException("Connection pool is closed")))).as(None)
}

F.delay(logger.debug(s"Requesting connection for $key: $stats"))
.productR(go())
.as(None)
} else
F.delay(callback(Left(new IllegalStateException("Connection pool is closed"))))
.as(None)
}
}

maxBorrowDuration match {
case d: FiniteDuration =>
connectionBorrowF.timeoutTo(
d,
F.defer(F.raiseError(ConnectionBorrowingException(key))),
)
case _ =>
connectionBorrowF
}
}

private def releaseRecyclable(key: RequestKey, connection: A): F[Unit] =
F.delay(waitQueue.dequeueFirst(_.key == key)).flatMap {
Expand Down Expand Up @@ -451,3 +478,6 @@ private final class PoolManager[F[_], A <: Connection[F]](

final case class NoConnectionAllowedException(key: RequestKey)
extends IllegalArgumentException(s"No client connections allowed to $key")

final case class ConnectionBorrowingException(key: RequestKey)
extends IllegalStateException(s"Requesting connection for $key has exceeded the timeout")
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class PoolManagerSuite extends CatsEffectSuite with AllSyntax {
requestTimeout: Duration = Duration.Inf,
builder: ConnectionBuilder[IO, TestConnection] = _ => IO(new TestConnection()),
maxIdleDuration: Duration = Duration.Inf,
maxBorrowDuration: Duration = Duration.Inf,
) =
ConnectionManager.pool(
builder = builder,
Expand All @@ -61,6 +62,7 @@ class PoolManagerSuite extends CatsEffectSuite with AllSyntax {
requestTimeout = requestTimeout,
executionContext = ExecutionContext.Implicits.global,
maxIdleDuration = maxIdleDuration,
maxBorrowDuration = maxBorrowDuration,
)

test("A pool manager should wait up to maxWaitQueueLimit") {
Expand Down Expand Up @@ -262,4 +264,42 @@ class PoolManagerSuite extends CatsEffectSuite with AllSyntax {
_ <- pool.borrow(key)
} yield assert(conn1.connection.isClosed)
}

test("Should not borrow connection if it exceeds the maxBorrowDuration") {
val connectionBuilder: ConnectionBuilder[IO, TestConnection] =
_ => IO.sleep(50.milliseconds) *> IO(new TestConnection())

for {
pool <- mkPool(
maxTotal = 1,
builder = connectionBuilder,
maxIdleDuration = Duration.Inf,
maxBorrowDuration = 10.milliseconds,
)
_ <- pool
.borrow(key)
.as(false)
.recover { case ConnectionBorrowingException(_) =>
true
}
.assert
} yield ()
}

test("Should borrow connection if it doesn't exceed the maxBorrowDuration") {
for {
pool <- mkPool(
maxTotal = 1,
maxIdleDuration = Duration.Inf,
maxBorrowDuration = 2.seconds,
)
_ <- pool
.borrow(key)
.as(true)
.recover { case ConnectionBorrowingException(_) =>
false
}
.assert
} yield ()
}
}
6 changes: 6 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ lazy val blazeClient = project
),
mimaBinaryIssueFilters ++= Seq(
// private constructor
ProblemFilters
.exclude[DirectMissingMethodProblem]("org.http4s.blaze.client.BlazeClientBuilder.this"),
ProblemFilters
.exclude[IncompatibleMethTypeProblem]("org.http4s.blaze.client.BlazeClientBuilder.this"),
ProblemFilters
Expand Down Expand Up @@ -284,12 +286,16 @@ lazy val blazeClient = project
.exclude[IncompatibleMethTypeProblem]("org.http4s.blaze.client.PoolManager.invalidate"),
ProblemFilters
.exclude[IncompatibleMethTypeProblem]("org.http4s.blaze.client.BasicManager.this"),
ProblemFilters
.exclude[DirectMissingMethodProblem]("org.http4s.blaze.client.ConnectionManager.pool"),
ProblemFilters
.exclude[IncompatibleMethTypeProblem]("org.http4s.blaze.client.ConnectionManager.pool"),
ProblemFilters
.exclude[IncompatibleMethTypeProblem]("org.http4s.blaze.client.ConnectionManager.basic"),
ProblemFilters
.exclude[IncompatibleMethTypeProblem]("org.http4s.blaze.client.PoolManager.this"),
ProblemFilters
.exclude[DirectMissingMethodProblem]("org.http4s.blaze.client.PoolManager.this"),
// inside private trait/clas/object
ProblemFilters
.exclude[DirectMissingMethodProblem]("org.http4s.blaze.client.BlazeConnection.runRequest"),
Expand Down