Skip to content

Commit

Permalink
Fix thread leak issue
Browse files Browse the repository at this point in the history
  • Loading branch information
kpritam committed Aug 6, 2019
1 parent 98b19ef commit bb82d3b
Showing 1 changed file with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.tmt.embedded_keycloak.impl

import java.util.concurrent.atomic.AtomicInteger
import java.util.{Timer, TimerTask}

import org.tmt.embedded_keycloak.Settings
Expand All @@ -12,18 +13,32 @@ import scala.util.Try
import scala.util.control.NonFatal

private[embedded_keycloak] class HealthCheck(settings: Settings) {
def checkHealth(): Future[Unit] = retry[Response](makeCall()).map(_ => ())

private def retry[T](f: => Future[T], attempts: Int = 10, interval: FiniteDuration = 3.seconds): Future[T] = {
/**
* This ID is used to generate thread names.
*/
private val nextSerialNumber = new AtomicInteger(0)
private def serialNumber = nextSerialNumber.getAndIncrement

def checkHealth(): Future[Unit] = {
val timer = new Timer(s"embedded-keyclock-timer-$serialNumber")
retry[Response](makeCall(), timer = timer)
.transform(_ => Try(timer.cancel()))
}

private def retry[T](f: => Future[T],
attempts: Int = 10,
interval: FiniteDuration = 3.seconds,
timer: Timer): Future[T] = {
f.recoverWith {
case NonFatal(_) if attempts > 0 => delay(interval) { retry(f, attempts - 1, interval) }.flatten
case NonFatal(_) if attempts > 0 =>
delay(interval, timer) { retry(f, attempts - 1, interval, timer) }.flatten
}
}

def delay[T](delay: FiniteDuration)(block: => T): Future[T] = {
def delay[T](delay: FiniteDuration, timer: Timer)(block: => T): Future[T] = {
val promise = Promise[T]()
val t = new Timer()
t.schedule(new TimerTask {
timer.schedule(new TimerTask {
override def run(): Unit = {
promise.complete(Try(block))
}
Expand Down

0 comments on commit bb82d3b

Please sign in to comment.