Skip to content

Commit

Permalink
Format code before release
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieuancelin committed Aug 6, 2021
1 parent 3bb39bc commit 5cdbc8d
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 55 deletions.
48 changes: 40 additions & 8 deletions otoroshi/app/cluster/cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,12 @@ class ClusterAgent(config: ClusterConfig, env: Env) {
def isLoginTokenValid(token: String): Future[Boolean] = {
if (env.clusterConfig.mode.isWorker) {
Retry
.retry(times = config.worker.retries, delay = config.retryDelay, factor = config.retryFactor, ctx = "leader-login-token-valid") { tryCount =>
.retry(
times = config.worker.retries,
delay = config.retryDelay,
factor = config.retryFactor,
ctx = "leader-login-token-valid"
) { tryCount =>
Cluster.logger.debug(s"Checking if login token $token is valid with a leader")
env.MtlsWs
.url(otoroshiUrl + s"/api/cluster/login-tokens/$token", config.mtlsConfig)
Expand Down Expand Up @@ -916,7 +921,12 @@ class ClusterAgent(config: ClusterConfig, env: Env) {
def getUserToken(token: String): Future[Option[JsValue]] = {
if (env.clusterConfig.mode.isWorker) {
Retry
.retry(times = config.worker.retries, delay = config.retryDelay, factor = config.retryFactor, ctx = "leader-user-token-get") { tryCount =>
.retry(
times = config.worker.retries,
delay = config.retryDelay,
factor = config.retryFactor,
ctx = "leader-user-token-get"
) { tryCount =>
Cluster.logger.debug(s"Checking if user token $token is valid with a leader")
env.MtlsWs
.url(otoroshiUrl + s"/api/cluster/user-tokens/$token", config.mtlsConfig)
Expand Down Expand Up @@ -951,7 +961,12 @@ class ClusterAgent(config: ClusterConfig, env: Env) {
if (env.clusterConfig.mode.isWorker) {
Cluster.logger.debug(s"Creating login token for $token on the leader")
Retry
.retry(times = config.worker.retries, delay = config.retryDelay, factor = config.retryFactor, ctx = "leader-create-login-token") { tryCount =>
.retry(
times = config.worker.retries,
delay = config.retryDelay,
factor = config.retryFactor,
ctx = "leader-create-login-token"
) { tryCount =>
env.MtlsWs
.url(otoroshiUrl + s"/api/cluster/login-tokens/$token", config.mtlsConfig)
.withHttpHeaders(
Expand Down Expand Up @@ -980,7 +995,12 @@ class ClusterAgent(config: ClusterConfig, env: Env) {
if (env.clusterConfig.mode.isWorker) {
Cluster.logger.debug(s"Creating user token for ${token} on the leader: ${Json.prettyPrint(user)}")
Retry
.retry(times = config.worker.retries, delay = config.retryDelay, factor = config.retryFactor, ctx = "leader-create-user-token") { tryCount =>
.retry(
times = config.worker.retries,
delay = config.retryDelay,
factor = config.retryFactor,
ctx = "leader-create-user-token"
) { tryCount =>
env.MtlsWs
.url(otoroshiUrl + s"/api/cluster/user-tokens", config.mtlsConfig)
.withHttpHeaders(
Expand Down Expand Up @@ -1008,7 +1028,12 @@ class ClusterAgent(config: ClusterConfig, env: Env) {
def isSessionValid(id: String): Future[Option[PrivateAppsUser]] = {
if (env.clusterConfig.mode.isWorker) {
Retry
.retry(times = config.worker.retries, delay = config.retryDelay, factor = config.retryFactor, ctx = "leader-session-valid") { tryCount =>
.retry(
times = config.worker.retries,
delay = config.retryDelay,
factor = config.retryFactor,
ctx = "leader-session-valid"
) { tryCount =>
Cluster.logger.debug(s"Checking if session $id is valid with a leader")
env.MtlsWs
.url(otoroshiUrl + s"/api/cluster/sessions/$id", config.mtlsConfig)
Expand Down Expand Up @@ -1043,7 +1068,12 @@ class ClusterAgent(config: ClusterConfig, env: Env) {
if (env.clusterConfig.mode.isWorker) {
Cluster.logger.debug(s"Creating session for ${user.email} on the leader: ${Json.prettyPrint(user.json)}")
Retry
.retry(times = config.worker.retries, delay = config.retryDelay, factor = config.retryFactor, ctx = "leader-create-session") { tryCount =>
.retry(
times = config.worker.retries,
delay = config.retryDelay,
factor = config.retryFactor,
ctx = "leader-create-session"
) { tryCount =>
env.MtlsWs
.url(otoroshiUrl + s"/api/cluster/sessions", config.mtlsConfig)
.withHttpHeaders(
Expand Down Expand Up @@ -1134,7 +1164,8 @@ class ClusterAgent(config: ClusterConfig, env: Env) {
Retry
.retry(
times = if (cannotServeRequests()) 10 else config.worker.state.retries,
delay = config.retryDelay, factor = config.retryFactor,
delay = config.retryDelay,
factor = config.retryFactor,
ctx = "leader-fetch-state"
) { tryCount =>
env.MtlsWs
Expand Down Expand Up @@ -1277,7 +1308,8 @@ class ClusterAgent(config: ClusterConfig, env: Env) {
Retry
.retry(
times = if (cannotServeRequests()) 10 else config.worker.quotas.retries,
delay = config.retryDelay, factor = config.retryFactor,
delay = config.retryDelay,
factor = config.retryFactor,
ctx = "leader-push-quotas"
) { tryCount =>
Cluster.logger.trace(
Expand Down
28 changes: 14 additions & 14 deletions otoroshi/app/models/descriptor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ object ClientConfig {
cacheConnectionSettings = CacheConnectionSettings(
enabled = (json \ "cacheConnectionSettings" \ "enabled").asOpt[Boolean].getOrElse(false),
queueSize = (json \ "cacheConnectionSettings" \ "queueSize").asOpt[Int].getOrElse(2048),
strategy = OverflowStrategy.dropNew,
strategy = OverflowStrategy.dropNew
),
customTimeouts = (json \ "customTimeouts")
.asOpt[JsArray]
Expand All @@ -818,19 +818,19 @@ object ClientConfig {

override def writes(o: ClientConfig): JsValue =
Json.obj(
"useCircuitBreaker" -> o.useCircuitBreaker,
"retries" -> o.retries,
"maxErrors" -> o.maxErrors,
"retryInitialDelay" -> o.retryInitialDelay,
"backoffFactor" -> o.backoffFactor,
"callTimeout" -> o.callTimeout,
"callAndStreamTimeout" -> o.callAndStreamTimeout,
"connectionTimeout" -> o.connectionTimeout,
"idleTimeout" -> o.idleTimeout,
"globalTimeout" -> o.globalTimeout,
"sampleInterval" -> o.sampleInterval,
"proxy" -> o.proxy.map(p => WSProxyServerJson.proxyToJson(p)).getOrElse(Json.obj()).as[JsValue],
"customTimeouts" -> JsArray(o.customTimeouts.map(_.toJson)),
"useCircuitBreaker" -> o.useCircuitBreaker,
"retries" -> o.retries,
"maxErrors" -> o.maxErrors,
"retryInitialDelay" -> o.retryInitialDelay,
"backoffFactor" -> o.backoffFactor,
"callTimeout" -> o.callTimeout,
"callAndStreamTimeout" -> o.callAndStreamTimeout,
"connectionTimeout" -> o.connectionTimeout,
"idleTimeout" -> o.idleTimeout,
"globalTimeout" -> o.globalTimeout,
"sampleInterval" -> o.sampleInterval,
"proxy" -> o.proxy.map(p => WSProxyServerJson.proxyToJson(p)).getOrElse(Json.obj()).as[JsValue],
"customTimeouts" -> JsArray(o.customTimeouts.map(_.toJson)),
"cacheConnectionSettings" -> o.cacheConnectionSettings.json
)
}
Expand Down
105 changes: 75 additions & 30 deletions otoroshi/app/utils/httpclient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ class AkkWsClient(config: WSClientConfig, env: Env)(implicit system: ActorSystem
connectionContextLooseHolder.set(connectionContextLoose)
}
val pool = customizer(connectionPoolSettings).withMaxConnections(512)
val cctx = if (loose) connectionContextLooseHolder.get() else connectionContextHolder.get()
val cctx = if (loose) connectionContextLooseHolder.get() else connectionContextHolder.get()
if (clientConfig.cacheConnectionSettings.enabled) {
queueClientRequest(request, pool, cctx, clientConfig.cacheConnectionSettings)
} else {
Expand Down Expand Up @@ -746,36 +746,61 @@ class AkkWsClient(config: WSClientConfig, env: Env)(implicit system: ActorSystem

private val queueCache = new TrieMap[String, SourceQueueWithComplete[(HttpRequest, Promise[HttpResponse])]]()

private def getQueue(request: HttpRequest, settings: ConnectionPoolSettings, connectionContext: HttpsConnectionContext, queueSettings: CacheConnectionSettings): SourceQueueWithComplete[(HttpRequest, Promise[HttpResponse])] = {
val host = request.uri.authority.host.toString()
val port = request.uri.authority.port
private def getQueue(
request: HttpRequest,
settings: ConnectionPoolSettings,
connectionContext: HttpsConnectionContext,
queueSettings: CacheConnectionSettings
): SourceQueueWithComplete[(HttpRequest, Promise[HttpResponse])] = {
val host = request.uri.authority.host.toString()
val port = request.uri.authority.port
val isHttps = request.uri.scheme.equalsIgnoreCase("https")
val key = s"$host-$port-$isHttps-${queueSettings.queueSize}"
queueCache.getOrElseUpdate(key, {
logger.debug(s"create host connection cache queue for '$key'")
val pool = if (isHttps) {
client.cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port, connectionContext = connectionContext, settings = settings)
} else {
client.cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port, settings = settings)
val key = s"$host-$port-$isHttps-${queueSettings.queueSize}"
queueCache.getOrElseUpdate(
key, {
logger.debug(s"create host connection cache queue for '$key'")
val pool = if (isHttps) {
client.cachedHostConnectionPoolHttps[Promise[HttpResponse]](
host = host,
port = port,
connectionContext = connectionContext,
settings = settings
)
} else {
client.cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port, settings = settings)
}
Source
.queue[(HttpRequest, Promise[HttpResponse])](queueSettings.queueSize, queueSettings.strategy)
.via(pool)
.to(Sink.foreach {
case (Success(resp), p) => p.success(resp)
case (Failure(e), p) => p.failure(e)
})
.run()
}
Source.queue[(HttpRequest, Promise[HttpResponse])](queueSettings.queueSize, queueSettings.strategy)
.via(pool)
.to(Sink.foreach {
case (Success(resp), p) => p.success(resp)
case (Failure(e), p) => p.failure(e)
}).run()
})
)
}

private def queueClientRequest(request: HttpRequest, settings: ConnectionPoolSettings, connectionContext: HttpsConnectionContext, queueSettings: CacheConnectionSettings): Future[HttpResponse] = {
val queue = getQueue(request, settings, connectionContext, queueSettings)
private def queueClientRequest(
request: HttpRequest,
settings: ConnectionPoolSettings,
connectionContext: HttpsConnectionContext,
queueSettings: CacheConnectionSettings
): Future[HttpResponse] = {
val queue = getQueue(request, settings, connectionContext, queueSettings)
val responsePromise = Promise[HttpResponse]()
queue.offer((request, responsePromise)).flatMap {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped => FastFuture.failed(ClientQueueError("Client queue overflowed. Try again later."))
case QueueOfferResult.Failure(ex) => FastFuture.failed(ClientQueueError(ex.getMessage))
case QueueOfferResult.QueueClosed => FastFuture.failed(ClientQueueError("Client queue was closed (pool shut down) while running the request. Try again later."))
}(ec)
queue
.offer((request, responsePromise))
.flatMap {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped =>
FastFuture.failed(ClientQueueError("Client queue overflowed. Try again later."))
case QueueOfferResult.Failure(ex) => FastFuture.failed(ClientQueueError(ex.getMessage))
case QueueOfferResult.QueueClosed =>
FastFuture.failed(
ClientQueueError("Client queue was closed (pool shut down) while running the request. Try again later.")
)
}(ec)
}

private[utils] def executeWsRequest[T](
Expand Down Expand Up @@ -836,10 +861,14 @@ class AkkWsClient(config: WSClientConfig, env: Env)(implicit system: ActorSystem

case class ClientQueueError(message: String) extends RuntimeException(message) with NoStackTrace

case class CacheConnectionSettings(enabled: Boolean = false, queueSize: Int = 2048, strategy: OverflowStrategy = OverflowStrategy.dropNew) {
case class CacheConnectionSettings(
enabled: Boolean = false,
queueSize: Int = 2048,
strategy: OverflowStrategy = OverflowStrategy.dropNew
) {
def json: JsValue = {
Json.obj(
"enabled" -> enabled,
"enabled" -> enabled,
"queueSize" -> queueSize
)
}
Expand Down Expand Up @@ -1158,7 +1187,15 @@ case class AkkaWsClientRequest(
.flatMap(_ => FastFuture.failed(RequestTimeoutException))
val start = System.currentTimeMillis()
val reqExec = client
.executeRequest(req, targetOpt.exists(_.mtlsConfig.loose), trustAll, certs, trustedCerts, clientConfig, customizer)
.executeRequest(
req,
targetOpt.exists(_.mtlsConfig.loose),
trustAll,
certs,
trustedCerts,
clientConfig,
customizer
)
.flatMap { resp =>
val remaining = zeTimeout.toMillis - (System.currentTimeMillis() - start)
if (alreadyFailed.get()) {
Expand Down Expand Up @@ -1209,7 +1246,15 @@ case class AkkaWsClientRequest(
.flatMap(_ => FastFuture.failed(RequestTimeoutException))
val start = System.currentTimeMillis()
val reqExec = client
.executeRequest(buildRequest(), targetOpt.exists(_.mtlsConfig.loose), trustAll, certs, trustedCerts, clientConfig, customizer)
.executeRequest(
buildRequest(),
targetOpt.exists(_.mtlsConfig.loose),
trustAll,
certs,
trustedCerts,
clientConfig,
customizer
)
.flatMap { response: HttpResponse =>
// FiniteDuration(client.wsClientConfig.requestTimeout._1, client.wsClientConfig.requestTimeout._2)
val remaining = zeTimeout.toMillis - (System.currentTimeMillis() - start)
Expand Down
10 changes: 7 additions & 3 deletions otoroshi/javascript/src/pages/ServicePage.js
Original file line number Diff line number Diff line change
Expand Up @@ -2721,12 +2721,16 @@ export class ServicePage extends Component {
value={this.state.service.clientConfig.useCircuitBreaker}
onChange={(v) => this.changeTheValue('clientConfig.useCircuitBreaker', v)}
/>
{this.state.service.useAkkaHttpClient && <BooleanInput
{this.state.service.useAkkaHttpClient && (
<BooleanInput
label="Cache connections"
help="Use a cache at host connection level to avoid reconnection time"
value={this.state.service.clientConfig.cacheConnectionSettings.enabled}
onChange={(v) => this.changeTheValue('clientConfig.cacheConnectionSettings.enabled', v)}
/>}
onChange={(v) =>
this.changeTheValue('clientConfig.cacheConnectionSettings.enabled', v)
}
/>
)}
<NumberInput
suffix="times"
label="Client attempts"
Expand Down

0 comments on commit 5cdbc8d

Please sign in to comment.