Skip to content

Commit

Permalink
Memory leak in akka.actor.LocalActorRef (#5442)
Browse files Browse the repository at this point in the history
* - Replaced the usage of deprecated OverflowStrategy.dropNew with BoundedSourceQueueStage
 - Added proper clean-up of materialized resources to prevent memory leaks for long-running streams

* - Added an execution context to the PoolingRestClient to be able to propagate it correctly from custom implementations
  • Loading branch information
Yevhen Sentiabov authored Sep 29, 2023
1 parent 0c27a65 commit 6f11d48
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ protected class AkkaContainerClient(
port: Int,
timeout: FiniteDuration,
queueSize: Int,
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem)
extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem, ec: ExecutionContext)
extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))(as, ec)
with ContainerClient {

def close() = shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class ElasticSearchLogStore(
elasticSearchConfig.protocol,
elasticSearchConfig.host,
elasticSearchConfig.port,
httpFlow)
httpFlow)(system, system.dispatcher)

private def transcribeLogs(queryResult: EsSearchResult): ActivationLogs =
ActivationLogs(queryResult.hits.hits.map(_.source.convertTo[UserLogEntry].toFormattedString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@

package org.apache.openwhisk.core.containerpool.logging

import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Either, Try}

import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods.{GET, POST}
import akka.http.scaladsl.model.headers.Accept
import akka.stream.scaladsl.Flow

import scala.concurrent.Promise
import scala.util.Try

import spray.json._

import org.apache.openwhisk.http.PoolingRestClient
import org.apache.openwhisk.http.PoolingRestClient._

Expand Down Expand Up @@ -154,8 +150,9 @@ class ElasticSearchRestClient(
host: String,
port: Int,
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None)(
implicit system: ActorSystem)
extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow) {
implicit system: ActorSystem,
ec: ExecutionContext)
extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow)(system, ec) {

import ElasticSearchJsonProtocol._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ import scala.concurrent.{ExecutionContext, Future}
class CouchDbRestClient(protocol: String, host: String, port: Int, username: String, password: String, db: String)(
implicit system: ActorSystem,
logging: Logging)
extends PoolingRestClient(protocol, host, port, 16 * 1024) {
extends PoolingRestClient(protocol, host, port, 16 * 1024)(
system,
system.dispatchers.lookup("dispatchers.couch-dispatcher")) {

protected implicit override val context: ExecutionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher")
protected implicit val context: ExecutionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher")

// Headers common to all requests.
protected val baseHeaders: List[HttpHeader] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.http.scaladsl.unmarshalling._
import akka.stream.{OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{Flow, _}
import akka.stream.{KillSwitches, QueueOfferResult}
import org.apache.openwhisk.common.AkkaLogging
import spray.json._
import scala.concurrent.{ExecutionContext, Future, Promise}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}

/**
Expand All @@ -45,10 +47,10 @@ class PoolingRestClient(
port: Int,
queueSize: Int,
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) {
timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem, ec: ExecutionContext) {
require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")

protected implicit val context: ExecutionContext = system.dispatcher
private val logging = new AkkaLogging(system.log)

//if specified, override the ClientConnection idle-timeout and keepalive socket option value
private val timeoutSettings = {
Expand All @@ -72,16 +74,19 @@ class PoolingRestClient(
// Additional queue in case all connections are busy. Should hardly ever be
// filled in practice but can be useful, e.g., in tests starting many
// asynchronous requests in a very short period of time.
private val requestQueue = Source
.queue(queueSize, OverflowStrategy.dropNew)
private val ((requestQueue, killSwitch), sinkCompletion) = Source
.queue(queueSize)
.via(httpFlow.getOrElse(pool))
.viaMat(KillSwitches.single)(Keep.both)
.toMat(Sink.foreach({
case (Success(response), p) =>
p.success(response)
case (Failure(error), p) =>
p.failure(error)
}))(Keep.left)
.run
}))(Keep.both)
.run()

sinkCompletion.onComplete(_ => shutdown())

/**
* Execute an HttpRequest on the underlying connection pool.
Expand All @@ -96,10 +101,10 @@ class PoolingRestClient(

// When the future completes, we know whether the request made it
// through the queue.
requestQueue.offer(request -> promise).flatMap {
requestQueue.offer(request -> promise) match {
case QueueOfferResult.Enqueued => promise.future
case QueueOfferResult.Dropped => Future.failed(new Exception("DB request queue is full."))
case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed."))
case QueueOfferResult.Dropped => Future.failed(new Exception("Request queue is full."))
case QueueOfferResult.QueueClosed => Future.failed(new Exception("Request queue was closed."))
case QueueOfferResult.Failure(f) => Future.failed(f)
}
}
Expand Down Expand Up @@ -127,7 +132,13 @@ class PoolingRestClient(
}
}

def shutdown(): Future[Unit] = Future.unit
def shutdown(): Future[Unit] = {
killSwitch.shutdown()
Try(requestQueue.complete()).recover {
case t: IllegalStateException => logging.warn(this, t.getMessage)
}
Future.unit
}
}

object PoolingRestClient {
Expand Down

0 comments on commit 6f11d48

Please sign in to comment.