diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala index 8dbc41f8a1d..03dcf78330f 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala @@ -70,8 +70,8 @@ protected class AkkaContainerClient( port: Int, timeout: FiniteDuration, queueSize: Int, - retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem) - extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout)) + retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem, ec: ExecutionContext) + extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))(as, ec) with ContainerClient { def close() = shutdown() diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala index 65033d3db97..7c2faa9080e 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala @@ -79,7 +79,7 @@ class ElasticSearchLogStore( elasticSearchConfig.protocol, elasticSearchConfig.host, elasticSearchConfig.port, - httpFlow) + httpFlow)(system, system.dispatcher) private def transcribeLogs(queryResult: EsSearchResult): ActivationLogs = ActivationLogs(queryResult.hits.hits.map(_.source.convertTo[UserLogEntry].toFormattedString)) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala index edd7d857ffa..04343dcddf6 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala @@ -17,20 +17,16 @@ package org.apache.openwhisk.core.containerpool.logging -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Either, Try} - import akka.actor.ActorSystem import akka.http.scaladsl.model._ import akka.http.scaladsl.model.HttpMethods.{GET, POST} import akka.http.scaladsl.model.headers.Accept import akka.stream.scaladsl.Flow -import scala.concurrent.Promise import scala.util.Try - import spray.json._ - import org.apache.openwhisk.http.PoolingRestClient import org.apache.openwhisk.http.PoolingRestClient._ @@ -154,8 +150,9 @@ class ElasticSearchRestClient( host: String, port: Int, httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None)( - implicit system: ActorSystem) - extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow) { + implicit system: ActorSystem, + ec: ExecutionContext) + extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow)(system, ec) { import ElasticSearchJsonProtocol._ diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala index ff0e74b7319..823a1679af1 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala @@ -42,9 +42,11 @@ import scala.concurrent.{ExecutionContext, Future} class CouchDbRestClient(protocol: String, host: String, port: Int, username: String, password: String, db: String)( implicit system: ActorSystem, logging: Logging) - extends PoolingRestClient(protocol, host, port, 16 * 1024) { + extends PoolingRestClient(protocol, host, port, 16 * 1024)( + system, + system.dispatchers.lookup("dispatchers.couch-dispatcher")) { - protected implicit override val context: ExecutionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher") + protected implicit val context: ExecutionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher") // Headers common to all requests. protected val baseHeaders: List[HttpHeader] = diff --git a/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala index 8d866542bf2..cd8d5a77e61 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala @@ -24,11 +24,13 @@ import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.model._ import akka.http.scaladsl.settings.ConnectionPoolSettings import akka.http.scaladsl.unmarshalling._ -import akka.stream.{OverflowStrategy, QueueOfferResult} import akka.stream.scaladsl.{Flow, _} +import akka.stream.{KillSwitches, QueueOfferResult} +import org.apache.openwhisk.common.AkkaLogging import spray.json._ -import scala.concurrent.{ExecutionContext, Future, Promise} + import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success, Try} /** @@ -45,10 +47,10 @@ class PoolingRestClient( port: Int, queueSize: Int, httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None, - timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) { + timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem, ec: ExecutionContext) { require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.") - protected implicit val context: ExecutionContext = system.dispatcher + private val logging = new AkkaLogging(system.log) //if specified, override the ClientConnection idle-timeout and keepalive socket option value private val timeoutSettings = { @@ -72,16 +74,19 @@ class PoolingRestClient( // Additional queue in case all connections are busy. Should hardly ever be // filled in practice but can be useful, e.g., in tests starting many // asynchronous requests in a very short period of time. - private val requestQueue = Source - .queue(queueSize, OverflowStrategy.dropNew) + private val ((requestQueue, killSwitch), sinkCompletion) = Source + .queue(queueSize) .via(httpFlow.getOrElse(pool)) + .viaMat(KillSwitches.single)(Keep.both) .toMat(Sink.foreach({ case (Success(response), p) => p.success(response) case (Failure(error), p) => p.failure(error) - }))(Keep.left) - .run + }))(Keep.both) + .run() + + sinkCompletion.onComplete(_ => shutdown()) /** * Execute an HttpRequest on the underlying connection pool. @@ -96,10 +101,10 @@ class PoolingRestClient( // When the future completes, we know whether the request made it // through the queue. - requestQueue.offer(request -> promise).flatMap { + requestQueue.offer(request -> promise) match { case QueueOfferResult.Enqueued => promise.future - case QueueOfferResult.Dropped => Future.failed(new Exception("DB request queue is full.")) - case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB request queue was closed.")) + case QueueOfferResult.Dropped => Future.failed(new Exception("Request queue is full.")) + case QueueOfferResult.QueueClosed => Future.failed(new Exception("Request queue was closed.")) case QueueOfferResult.Failure(f) => Future.failed(f) } } @@ -127,7 +132,13 @@ class PoolingRestClient( } } - def shutdown(): Future[Unit] = Future.unit + def shutdown(): Future[Unit] = { + killSwitch.shutdown() + Try(requestQueue.complete()).recover { + case t: IllegalStateException => logging.warn(this, t.getMessage) + } + Future.unit + } } object PoolingRestClient {