Skip to content

Commit

Permalink
Format code before release
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieuancelin committed Jan 17, 2020
1 parent 09ba161 commit f3ccba5
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 162 deletions.
111 changes: 62 additions & 49 deletions otoroshi/app/cluster/cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ class ClusterController(ApiAction: ApiAction, cc: ControllerComponents)(
case Leader => {
// Cluster.logger.trace(s"[${env.clusterConfig.mode.name}] updating quotas")
val budget: Long = ctx.request.getQueryString("budget").map(_.toLong).getOrElse(2000L)
val start: Long = System.currentTimeMillis()
val start: Long = System.currentTimeMillis()
val bytesCounter = new AtomicLong(0L)
env.datastores.globalConfigDataStore.singleton().flatMap { config =>
ctx.request.body
Expand Down Expand Up @@ -758,7 +758,9 @@ class ClusterController(ApiAction: ApiAction, cc: ControllerComponents)(
Cluster.logger.trace(s"[${env.clusterConfig.mode.name}] updated quotas (${bytesCounter.get()} b)")
env.datastores.clusterStateDataStore.updateDataIn(bytesCounter.get())
if ((System.currentTimeMillis() - start) > budget) {
Cluster.logger.warn(s"[${env.clusterConfig.mode.name}] Quotas update from worker ran over time budget, maybe the datastore is slow ?")
Cluster.logger.warn(
s"[${env.clusterConfig.mode.name}] Quotas update from worker ran over time budget, maybe the datastore is slow ?"
)
}
}
.map(_ => Ok(Json.obj("done" -> true)))
Expand Down Expand Up @@ -797,7 +799,7 @@ class ClusterController(ApiAction: ApiAction, cc: ControllerComponents)(
case Leader => {

val budget: Long = ctx.request.getQueryString("budget").map(_.toLong).getOrElse(2000L)
val cachedValue = cachedRef.get()
val cachedValue = cachedRef.get()

ctx.request.headers.get(ClusterAgent.OtoroshiWorkerNameHeader).map { name =>
env.datastores.clusterStateDataStore.registerMember(
Expand All @@ -816,40 +818,43 @@ class ClusterController(ApiAction: ApiAction, cc: ControllerComponents)(
// Cluster.logger.debug(s"[${env.clusterConfig.mode.name}] Exporting raw state")
if (caching.compareAndSet(false, true)) {
val start: Long = System.currentTimeMillis()
var stateCache = ByteString.empty
var stateCache = ByteString.empty
Ok.sendEntity(
HttpEntity.Streamed(
env.datastores
.rawExport(env.clusterConfig.leader.groupingBy)
.map { item =>
ByteString(Json.stringify(item) + "\n")
}
.via(env.clusterConfig.gzip())
.alsoTo(Sink.foreach(bs => stateCache = stateCache ++ bs))
.alsoTo(Sink.onComplete {
case Success(_) =>
if ((System.currentTimeMillis() - start) > budget) {
Cluster.logger.warn(s"[${env.clusterConfig.mode.name}] Datastore export to worker ran over time budget, maybe the datastore is slow ?")
}
cachedRef.set(stateCache)
cachedAt.set(System.currentTimeMillis())
caching.compareAndSet(true, false)
env.datastores.clusterStateDataStore.updateDataOut(stateCache.size)
env.clusterConfig.leader.stateDumpPath
.foreach(path => Future(Files.write(stateCache.toArray, new File(path))))
Cluster.logger.debug(
s"[${env.clusterConfig.mode.name}] Exported raw state (${stateCache.size / 1024} Kb) in ${System.currentTimeMillis - start} ms."
)
case Failure(e) =>
Cluster.logger.error(s"[${env.clusterConfig.mode.name}] Stream error while exporting raw state",
e)
}),
None,
Some("application/x-ndjson")
HttpEntity.Streamed(
env.datastores
.rawExport(env.clusterConfig.leader.groupingBy)
.map { item =>
ByteString(Json.stringify(item) + "\n")
}
.via(env.clusterConfig.gzip())
.alsoTo(Sink.foreach(bs => stateCache = stateCache ++ bs))
.alsoTo(Sink.onComplete {
case Success(_) =>
if ((System.currentTimeMillis() - start) > budget) {
Cluster.logger.warn(
s"[${env.clusterConfig.mode.name}] Datastore export to worker ran over time budget, maybe the datastore is slow ?"
)
}
cachedRef.set(stateCache)
cachedAt.set(System.currentTimeMillis())
caching.compareAndSet(true, false)
env.datastores.clusterStateDataStore.updateDataOut(stateCache.size)
env.clusterConfig.leader.stateDumpPath
.foreach(path => Future(Files.write(stateCache.toArray, new File(path))))
Cluster.logger.debug(
s"[${env.clusterConfig.mode.name}] Exported raw state (${stateCache.size / 1024} Kb) in ${System.currentTimeMillis - start} ms."
)
case Failure(e) =>
Cluster.logger.error(s"[${env.clusterConfig.mode.name}] Stream error while exporting raw state",
e)
}),
None,
Some("application/x-ndjson")
)
)
).withHeaders(
"X-Data-From" -> s"${System.currentTimeMillis()}"
) //.withHeaders("Content-Encoding" -> "gzip")
.withHeaders(
"X-Data-From" -> s"${System.currentTimeMillis()}"
) //.withHeaders("Content-Encoding" -> "gzip")
} else {
Cluster.logger.debug(
s"[${env.clusterConfig.mode.name}] Sending state from cache (${cachedValue.size / 1024} Kb) ..."
Expand All @@ -862,27 +867,32 @@ class ClusterController(ApiAction: ApiAction, cc: ControllerComponents)(
Cluster.logger.debug(
s"[${env.clusterConfig.mode.name}] Sending state from auto cache (${cachedValue.size / 1024} Kb) ..."
)
Ok.sendEntity(HttpEntity.Streamed(Source.single(env.clusterLeaderAgent.cachedState), None, Some("application/x-ndjson"))).withHeaders(
"X-Data-From" -> s"${env.clusterLeaderAgent.cachedTimestamp}"
)
Ok.sendEntity(
HttpEntity.Streamed(Source.single(env.clusterLeaderAgent.cachedState), None, Some("application/x-ndjson"))
)
.withHeaders(
"X-Data-From" -> s"${env.clusterLeaderAgent.cachedTimestamp}"
)
} else if (cachedValue == null) {
sendAndCache()
} else if (caching.get()) {
Cluster.logger.debug(
s"[${env.clusterConfig.mode.name}] Sending state from cache (${cachedValue.size / 1024} Kb) ..."
)
Ok.sendEntity(HttpEntity.Streamed(Source.single(cachedValue), None, Some("application/x-ndjson"))).withHeaders(
"X-Data-From" -> s"${cachedAt.get()}"
)
Ok.sendEntity(HttpEntity.Streamed(Source.single(cachedValue), None, Some("application/x-ndjson")))
.withHeaders(
"X-Data-From" -> s"${cachedAt.get()}"
)
} else if ((cachedAt.get() + env.clusterConfig.leader.cacheStateFor) < System.currentTimeMillis()) {
sendAndCache()
} else {
Cluster.logger.debug(
s"[${env.clusterConfig.mode.name}] Sending state from cache (${cachedValue.size / 1024} Kb) ..."
)
Ok.sendEntity(HttpEntity.Streamed(Source.single(cachedValue), None, Some("application/x-ndjson"))).withHeaders(
"X-Data-From" -> s"${cachedAt.get()}"
)
Ok.sendEntity(HttpEntity.Streamed(Source.single(cachedValue), None, Some("application/x-ndjson")))
.withHeaders(
"X-Data-From" -> s"${cachedAt.get()}"
)
}
}
}
Expand Down Expand Up @@ -929,7 +939,7 @@ class ClusterLeaderAgent(config: ClusterConfig, env: Env) {
implicit lazy val sched = env.otoroshiScheduler
implicit lazy val _env = env

private val membershipRef = new AtomicReference[Cancellable]()
private val membershipRef = new AtomicReference[Cancellable]()
private val stateUpdaterRef = new AtomicReference[Cancellable]()

private val caching = new AtomicBoolean(false)
Expand Down Expand Up @@ -1035,12 +1045,12 @@ class ClusterLeaderAgent(config: ClusterConfig, env: Env) {
}
}

def cachedState = cachedRef.get()
def cachedState = cachedRef.get()
def cachedTimestamp = cachedAt.get()

private def cacheState(): Unit = {
if (caching.compareAndSet(false, true)) {
val start = System.currentTimeMillis()
val start = System.currentTimeMillis()
var stateCache = ByteString.empty
env.datastores
.rawExport(env.clusterConfig.leader.groupingBy)
Expand All @@ -1057,10 +1067,13 @@ class ClusterLeaderAgent(config: ClusterConfig, env: Env) {
env.datastores.clusterStateDataStore.updateDataOut(stateCache.size)
env.clusterConfig.leader.stateDumpPath
.foreach(path => Future(Files.write(stateCache.toArray, new File(path))))
Cluster.logger.debug(s"[${env.clusterConfig.mode.name}] Auto-cache updated in ${System.currentTimeMillis() - start} ms.")
Cluster.logger.debug(
s"[${env.clusterConfig.mode.name}] Auto-cache updated in ${System.currentTimeMillis() - start} ms."
)
case Failure(e) =>
Cluster.logger.error(s"[${env.clusterConfig.mode.name}] Stream error while exporting raw state", e)
}).runWith(Sink.ignore)
})
.runWith(Sink.ignore)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions otoroshi/app/controllers/ApiController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,6 @@ class ApiController(ApiAction: ApiAction, UnAuthApiAction: UnAuthApiAction, cc:
}
}


//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

def createService() = ApiAction.async(parse.json) { ctx =>
Expand Down Expand Up @@ -2422,7 +2421,8 @@ class ApiController(ApiAction: ApiAction, UnAuthApiAction: UnAuthApiAction, cc:

def createGlobalJwtVerifier() = ApiAction.async(parse.json) { ctx =>
val id = (ctx.request.body \ "id").asOpt[String]
val body = ctx.request.body.as[JsObject] ++ id.map(v => Json.obj("id" -> id)).getOrElse(Json.obj("id" -> IdGenerator.token))
val body = ctx.request.body
.as[JsObject] ++ id.map(v => Json.obj("id" -> id)).getOrElse(Json.obj("id" -> IdGenerator.token))
GlobalJwtVerifier.fromJson(body) match {
case Left(e) => BadRequest(Json.obj("error" -> "Bad GlobalJwtVerifier format")).asFuture
case Right(newVerifier) =>
Expand Down Expand Up @@ -2489,7 +2489,8 @@ class ApiController(ApiAction: ApiAction, UnAuthApiAction: UnAuthApiAction, cc:

def createGlobalAuthModule() = ApiAction.async(parse.json) { ctx =>
val id = (ctx.request.body \ "id").asOpt[String]
val body = ctx.request.body.as[JsObject] ++ id.map(v => Json.obj("id" -> id)).getOrElse(Json.obj("id" -> IdGenerator.token))
val body = ctx.request.body
.as[JsObject] ++ id.map(v => Json.obj("id" -> id)).getOrElse(Json.obj("id" -> IdGenerator.token))
AuthModuleConfig._fmt.reads(body) match {
case JsError(e) => BadRequest(Json.obj("error" -> "Bad GlobalAuthModule format")).asFuture
case JsSuccess(newVerifier, _) =>
Expand Down
Loading

0 comments on commit f3ccba5

Please sign in to comment.