diff --git a/otoroshi/app/cluster/cluster.scala b/otoroshi/app/cluster/cluster.scala index b15f059b3b..746b255c7b 100644 --- a/otoroshi/app/cluster/cluster.scala +++ b/otoroshi/app/cluster/cluster.scala @@ -882,7 +882,12 @@ class ClusterAgent(config: ClusterConfig, env: Env) { def isLoginTokenValid(token: String): Future[Boolean] = { if (env.clusterConfig.mode.isWorker) { Retry - .retry(times = config.worker.retries, delay = config.retryDelay, factor = config.retryFactor, ctx = "leader-login-token-valid") { tryCount => + .retry( + times = config.worker.retries, + delay = config.retryDelay, + factor = config.retryFactor, + ctx = "leader-login-token-valid" + ) { tryCount => Cluster.logger.debug(s"Checking if login token $token is valid with a leader") env.MtlsWs .url(otoroshiUrl + s"/api/cluster/login-tokens/$token", config.mtlsConfig) @@ -916,7 +921,12 @@ class ClusterAgent(config: ClusterConfig, env: Env) { def getUserToken(token: String): Future[Option[JsValue]] = { if (env.clusterConfig.mode.isWorker) { Retry - .retry(times = config.worker.retries, delay = config.retryDelay, factor = config.retryFactor, ctx = "leader-user-token-get") { tryCount => + .retry( + times = config.worker.retries, + delay = config.retryDelay, + factor = config.retryFactor, + ctx = "leader-user-token-get" + ) { tryCount => Cluster.logger.debug(s"Checking if user token $token is valid with a leader") env.MtlsWs .url(otoroshiUrl + s"/api/cluster/user-tokens/$token", config.mtlsConfig) @@ -951,7 +961,12 @@ class ClusterAgent(config: ClusterConfig, env: Env) { if (env.clusterConfig.mode.isWorker) { Cluster.logger.debug(s"Creating login token for $token on the leader") Retry - .retry(times = config.worker.retries, delay = config.retryDelay, factor = config.retryFactor, ctx = "leader-create-login-token") { tryCount => + .retry( + times = config.worker.retries, + delay = config.retryDelay, + factor = config.retryFactor, + ctx = "leader-create-login-token" + ) { tryCount => env.MtlsWs .url(otoroshiUrl + s"/api/cluster/login-tokens/$token", config.mtlsConfig) .withHttpHeaders( @@ -980,7 +995,12 @@ class ClusterAgent(config: ClusterConfig, env: Env) { if (env.clusterConfig.mode.isWorker) { Cluster.logger.debug(s"Creating user token for ${token} on the leader: ${Json.prettyPrint(user)}") Retry - .retry(times = config.worker.retries, delay = config.retryDelay, factor = config.retryFactor, ctx = "leader-create-user-token") { tryCount => + .retry( + times = config.worker.retries, + delay = config.retryDelay, + factor = config.retryFactor, + ctx = "leader-create-user-token" + ) { tryCount => env.MtlsWs .url(otoroshiUrl + s"/api/cluster/user-tokens", config.mtlsConfig) .withHttpHeaders( @@ -1008,7 +1028,12 @@ class ClusterAgent(config: ClusterConfig, env: Env) { def isSessionValid(id: String): Future[Option[PrivateAppsUser]] = { if (env.clusterConfig.mode.isWorker) { Retry - .retry(times = config.worker.retries, delay = config.retryDelay, factor = config.retryFactor, ctx = "leader-session-valid") { tryCount => + .retry( + times = config.worker.retries, + delay = config.retryDelay, + factor = config.retryFactor, + ctx = "leader-session-valid" + ) { tryCount => Cluster.logger.debug(s"Checking if session $id is valid with a leader") env.MtlsWs .url(otoroshiUrl + s"/api/cluster/sessions/$id", config.mtlsConfig) @@ -1043,7 +1068,12 @@ class ClusterAgent(config: ClusterConfig, env: Env) { if (env.clusterConfig.mode.isWorker) { Cluster.logger.debug(s"Creating session for ${user.email} on the leader: ${Json.prettyPrint(user.json)}") Retry - .retry(times = config.worker.retries, delay = config.retryDelay, factor = config.retryFactor, ctx = "leader-create-session") { tryCount => + .retry( + times = config.worker.retries, + delay = config.retryDelay, + factor = config.retryFactor, + ctx = "leader-create-session" + ) { tryCount => env.MtlsWs .url(otoroshiUrl + s"/api/cluster/sessions", config.mtlsConfig) .withHttpHeaders( @@ -1134,7 +1164,8 @@ class ClusterAgent(config: ClusterConfig, env: Env) { Retry .retry( times = if (cannotServeRequests()) 10 else config.worker.state.retries, - delay = config.retryDelay, factor = config.retryFactor, + delay = config.retryDelay, + factor = config.retryFactor, ctx = "leader-fetch-state" ) { tryCount => env.MtlsWs @@ -1277,7 +1308,8 @@ class ClusterAgent(config: ClusterConfig, env: Env) { Retry .retry( times = if (cannotServeRequests()) 10 else config.worker.quotas.retries, - delay = config.retryDelay, factor = config.retryFactor, + delay = config.retryDelay, + factor = config.retryFactor, ctx = "leader-push-quotas" ) { tryCount => Cluster.logger.trace( diff --git a/otoroshi/app/models/descriptor.scala b/otoroshi/app/models/descriptor.scala index b4d03a685d..a994da6a29 100644 --- a/otoroshi/app/models/descriptor.scala +++ b/otoroshi/app/models/descriptor.scala @@ -802,7 +802,7 @@ object ClientConfig { cacheConnectionSettings = CacheConnectionSettings( enabled = (json \ "cacheConnectionSettings" \ "enabled").asOpt[Boolean].getOrElse(false), queueSize = (json \ "cacheConnectionSettings" \ "queueSize").asOpt[Int].getOrElse(2048), - strategy = OverflowStrategy.dropNew, + strategy = OverflowStrategy.dropNew ), customTimeouts = (json \ "customTimeouts") .asOpt[JsArray] @@ -818,19 +818,19 @@ object ClientConfig { override def writes(o: ClientConfig): JsValue = Json.obj( - "useCircuitBreaker" -> o.useCircuitBreaker, - "retries" -> o.retries, - "maxErrors" -> o.maxErrors, - "retryInitialDelay" -> o.retryInitialDelay, - "backoffFactor" -> o.backoffFactor, - "callTimeout" -> o.callTimeout, - "callAndStreamTimeout" -> o.callAndStreamTimeout, - "connectionTimeout" -> o.connectionTimeout, - "idleTimeout" -> o.idleTimeout, - "globalTimeout" -> o.globalTimeout, - "sampleInterval" -> o.sampleInterval, - "proxy" -> o.proxy.map(p => WSProxyServerJson.proxyToJson(p)).getOrElse(Json.obj()).as[JsValue], - "customTimeouts" -> JsArray(o.customTimeouts.map(_.toJson)), + "useCircuitBreaker" -> o.useCircuitBreaker, + "retries" -> o.retries, + "maxErrors" -> o.maxErrors, + "retryInitialDelay" -> o.retryInitialDelay, + "backoffFactor" -> o.backoffFactor, + "callTimeout" -> o.callTimeout, + "callAndStreamTimeout" -> o.callAndStreamTimeout, + "connectionTimeout" -> o.connectionTimeout, + "idleTimeout" -> o.idleTimeout, + "globalTimeout" -> o.globalTimeout, + "sampleInterval" -> o.sampleInterval, + "proxy" -> o.proxy.map(p => WSProxyServerJson.proxyToJson(p)).getOrElse(Json.obj()).as[JsValue], + "customTimeouts" -> JsArray(o.customTimeouts.map(_.toJson)), "cacheConnectionSettings" -> o.cacheConnectionSettings.json ) } diff --git a/otoroshi/app/utils/httpclient.scala b/otoroshi/app/utils/httpclient.scala index 64994abf52..fe74191d3b 100644 --- a/otoroshi/app/utils/httpclient.scala +++ b/otoroshi/app/utils/httpclient.scala @@ -700,7 +700,7 @@ class AkkWsClient(config: WSClientConfig, env: Env)(implicit system: ActorSystem connectionContextLooseHolder.set(connectionContextLoose) } val pool = customizer(connectionPoolSettings).withMaxConnections(512) - val cctx = if (loose) connectionContextLooseHolder.get() else connectionContextHolder.get() + val cctx = if (loose) connectionContextLooseHolder.get() else connectionContextHolder.get() if (clientConfig.cacheConnectionSettings.enabled) { queueClientRequest(request, pool, cctx, clientConfig.cacheConnectionSettings) } else { @@ -746,36 +746,61 @@ class AkkWsClient(config: WSClientConfig, env: Env)(implicit system: ActorSystem private val queueCache = new TrieMap[String, SourceQueueWithComplete[(HttpRequest, Promise[HttpResponse])]]() - private def getQueue(request: HttpRequest, settings: ConnectionPoolSettings, connectionContext: HttpsConnectionContext, queueSettings: CacheConnectionSettings): SourceQueueWithComplete[(HttpRequest, Promise[HttpResponse])] = { - val host = request.uri.authority.host.toString() - val port = request.uri.authority.port + private def getQueue( + request: HttpRequest, + settings: ConnectionPoolSettings, + connectionContext: HttpsConnectionContext, + queueSettings: CacheConnectionSettings + ): SourceQueueWithComplete[(HttpRequest, Promise[HttpResponse])] = { + val host = request.uri.authority.host.toString() + val port = request.uri.authority.port val isHttps = request.uri.scheme.equalsIgnoreCase("https") - val key = s"$host-$port-$isHttps-${queueSettings.queueSize}" - queueCache.getOrElseUpdate(key, { - logger.debug(s"create host connection cache queue for '$key'") - val pool = if (isHttps) { - client.cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port, connectionContext = connectionContext, settings = settings) - } else { - client.cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port, settings = settings) + val key = s"$host-$port-$isHttps-${queueSettings.queueSize}" + queueCache.getOrElseUpdate( + key, { + logger.debug(s"create host connection cache queue for '$key'") + val pool = if (isHttps) { + client.cachedHostConnectionPoolHttps[Promise[HttpResponse]]( + host = host, + port = port, + connectionContext = connectionContext, + settings = settings + ) + } else { + client.cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port, settings = settings) + } + Source + .queue[(HttpRequest, Promise[HttpResponse])](queueSettings.queueSize, queueSettings.strategy) + .via(pool) + .to(Sink.foreach { + case (Success(resp), p) => p.success(resp) + case (Failure(e), p) => p.failure(e) + }) + .run() } - Source.queue[(HttpRequest, Promise[HttpResponse])](queueSettings.queueSize, queueSettings.strategy) - .via(pool) - .to(Sink.foreach { - case (Success(resp), p) => p.success(resp) - case (Failure(e), p) => p.failure(e) - }).run() - }) + ) } - private def queueClientRequest(request: HttpRequest, settings: ConnectionPoolSettings, connectionContext: HttpsConnectionContext, queueSettings: CacheConnectionSettings): Future[HttpResponse] = { - val queue = getQueue(request, settings, connectionContext, queueSettings) + private def queueClientRequest( + request: HttpRequest, + settings: ConnectionPoolSettings, + connectionContext: HttpsConnectionContext, + queueSettings: CacheConnectionSettings + ): Future[HttpResponse] = { + val queue = getQueue(request, settings, connectionContext, queueSettings) val responsePromise = Promise[HttpResponse]() - queue.offer((request, responsePromise)).flatMap { - case QueueOfferResult.Enqueued => responsePromise.future - case QueueOfferResult.Dropped => FastFuture.failed(ClientQueueError("Client queue overflowed. Try again later.")) - case QueueOfferResult.Failure(ex) => FastFuture.failed(ClientQueueError(ex.getMessage)) - case QueueOfferResult.QueueClosed => FastFuture.failed(ClientQueueError("Client queue was closed (pool shut down) while running the request. Try again later.")) - }(ec) + queue + .offer((request, responsePromise)) + .flatMap { + case QueueOfferResult.Enqueued => responsePromise.future + case QueueOfferResult.Dropped => + FastFuture.failed(ClientQueueError("Client queue overflowed. Try again later.")) + case QueueOfferResult.Failure(ex) => FastFuture.failed(ClientQueueError(ex.getMessage)) + case QueueOfferResult.QueueClosed => + FastFuture.failed( + ClientQueueError("Client queue was closed (pool shut down) while running the request. Try again later.") + ) + }(ec) } private[utils] def executeWsRequest[T]( @@ -836,10 +861,14 @@ class AkkWsClient(config: WSClientConfig, env: Env)(implicit system: ActorSystem case class ClientQueueError(message: String) extends RuntimeException(message) with NoStackTrace -case class CacheConnectionSettings(enabled: Boolean = false, queueSize: Int = 2048, strategy: OverflowStrategy = OverflowStrategy.dropNew) { +case class CacheConnectionSettings( + enabled: Boolean = false, + queueSize: Int = 2048, + strategy: OverflowStrategy = OverflowStrategy.dropNew +) { def json: JsValue = { Json.obj( - "enabled" -> enabled, + "enabled" -> enabled, "queueSize" -> queueSize ) } @@ -1158,7 +1187,15 @@ case class AkkaWsClientRequest( .flatMap(_ => FastFuture.failed(RequestTimeoutException)) val start = System.currentTimeMillis() val reqExec = client - .executeRequest(req, targetOpt.exists(_.mtlsConfig.loose), trustAll, certs, trustedCerts, clientConfig, customizer) + .executeRequest( + req, + targetOpt.exists(_.mtlsConfig.loose), + trustAll, + certs, + trustedCerts, + clientConfig, + customizer + ) .flatMap { resp => val remaining = zeTimeout.toMillis - (System.currentTimeMillis() - start) if (alreadyFailed.get()) { @@ -1209,7 +1246,15 @@ case class AkkaWsClientRequest( .flatMap(_ => FastFuture.failed(RequestTimeoutException)) val start = System.currentTimeMillis() val reqExec = client - .executeRequest(buildRequest(), targetOpt.exists(_.mtlsConfig.loose), trustAll, certs, trustedCerts, clientConfig, customizer) + .executeRequest( + buildRequest(), + targetOpt.exists(_.mtlsConfig.loose), + trustAll, + certs, + trustedCerts, + clientConfig, + customizer + ) .flatMap { response: HttpResponse => // FiniteDuration(client.wsClientConfig.requestTimeout._1, client.wsClientConfig.requestTimeout._2) val remaining = zeTimeout.toMillis - (System.currentTimeMillis() - start) diff --git a/otoroshi/javascript/src/pages/ServicePage.js b/otoroshi/javascript/src/pages/ServicePage.js index bbae19f3e1..6720f225e5 100644 --- a/otoroshi/javascript/src/pages/ServicePage.js +++ b/otoroshi/javascript/src/pages/ServicePage.js @@ -2721,12 +2721,16 @@ export class ServicePage extends Component { value={this.state.service.clientConfig.useCircuitBreaker} onChange={(v) => this.changeTheValue('clientConfig.useCircuitBreaker', v)} /> - {this.state.service.useAkkaHttpClient && this.changeTheValue('clientConfig.cacheConnectionSettings.enabled', v)} - />} + onChange={(v) => + this.changeTheValue('clientConfig.cacheConnectionSettings.enabled', v) + } + /> + )}