diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 63571da90e..dbbad17e63 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -36,7 +36,7 @@ import cats.{ } import cats.data.Ior import cats.effect.instances.spawn -import cats.effect.std.{Console, Env, UUIDGen} +import cats.effect.std.{Console, Env, UUIDGen, Retry} import cats.effect.tracing.{Tracing, TracingEvent} import cats.syntax.all._ @@ -484,6 +484,10 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] { def recoverWith[B >: A](pf: PartialFunction[Throwable, IO[B]]): IO[B] = handleErrorWith(e => pf.applyOrElse(e, IO.raiseError)) + + def retry(r: Retry[IO]): IO[A] = + Retry.retry(r, this) + def ifM[B](ifTrue: => IO[B], ifFalse: => IO[B])(implicit ev: A <:< Boolean): IO[B] = flatMap(a => if (ev(a)) ifTrue else ifFalse) diff --git a/std/shared/src/main/scala/cats/effect/std/OldRetry.scala b/std/shared/src/main/scala/cats/effect/std/OldRetry.scala new file mode 100644 index 0000000000..037399e6ff --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/OldRetry.scala @@ -0,0 +1,666 @@ +package cats.effect.std +package retry + +import cats._ +import cats.syntax.all._ +import cats.arrow.FunctionK +import cats.kernel.BoundedSemilattice +import cats.effect.kernel.Temporal +import retry.PolicyDecision._ +import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.util.Random +import java.util.concurrent.TimeUnit + + +final case class RetryStatus( + retriesSoFar: Int, + cumulativeDelay: FiniteDuration, + previousDelay: Option[FiniteDuration] +) { + def addRetry(delay: FiniteDuration): RetryStatus = RetryStatus( + retriesSoFar = this.retriesSoFar + 1, + cumulativeDelay = this.cumulativeDelay + delay, + previousDelay = Some(delay) + ) +} + +object RetryStatus { + val NoRetriesYet = RetryStatus(0, Duration.Zero, None) +} + +sealed trait PolicyDecision + +object PolicyDecision { + case object GiveUp extends PolicyDecision + + final case class DelayAndRetry( + delay: FiniteDuration + ) extends PolicyDecision +} + +case class RetryPolicy[M[_]]( + decideNextRetry: RetryStatus => M[PolicyDecision] +) { + def show: String = toString + + def followedBy(rp: RetryPolicy[M])(implicit M: Apply[M]): RetryPolicy[M] = + RetryPolicy.withShow( + status => + M.map2(decideNextRetry(status), rp.decideNextRetry(status)) { + case (GiveUp, pd) => pd + case (pd, _) => pd + }, + show"$show.followedBy($rp)" + ) + + /** + * Combine this schedule with another schedule, giving up when either of the schedules want to + * give up and choosing the maximum of the two delays when both of the schedules want to delay + * the next retry. The dual of the `meet` operation. + */ + def join(rp: RetryPolicy[M])(implicit M: Apply[M]): RetryPolicy[M] = + RetryPolicy.withShow[M]( + status => + M.map2(decideNextRetry(status), rp.decideNextRetry(status)) { + case (DelayAndRetry(a), DelayAndRetry(b)) => DelayAndRetry(a max b) + case _ => GiveUp + }, + show"$show.join($rp)" + ) + + /** + * Combine this schedule with another schedule, giving up when both of the schedules want to + * give up and choosing the minimum of the two delays when both of the schedules want to delay + * the next retry. The dual of the `join` operation. + */ + def meet(rp: RetryPolicy[M])(implicit M: Apply[M]): RetryPolicy[M] = + RetryPolicy.withShow[M]( + status => + M.map2(decideNextRetry(status), rp.decideNextRetry(status)) { + case (DelayAndRetry(a), DelayAndRetry(b)) => DelayAndRetry(a min b) + case (s @ DelayAndRetry(_), GiveUp) => s + case (GiveUp, s @ DelayAndRetry(_)) => s + case _ => GiveUp + }, + show"$show.meet($rp)" + ) + + def mapDelay( + f: FiniteDuration => FiniteDuration + )(implicit M: Functor[M]): RetryPolicy[M] = + RetryPolicy.withShow( + status => + M.map(decideNextRetry(status)) { + case GiveUp => GiveUp + case DelayAndRetry(d) => DelayAndRetry(f(d)) + }, + show"$show.mapDelay()" + ) + + def flatMapDelay( + f: FiniteDuration => M[FiniteDuration] + )(implicit M: Monad[M]): RetryPolicy[M] = + RetryPolicy.withShow( + status => + M.flatMap(decideNextRetry(status)) { + case GiveUp => M.pure(GiveUp) + case DelayAndRetry(d) => M.map(f(d))(DelayAndRetry(_)) + }, + show"$show.flatMapDelay()" + ) + + def mapK[N[_]](nt: FunctionK[M, N]): RetryPolicy[N] = + RetryPolicy.withShow( + status => nt(decideNextRetry(status)), + show"$show.mapK()" + ) +} + +object RetryPolicy { + def lift[M[_]]( + f: RetryStatus => PolicyDecision + )(implicit M: Applicative[M]): RetryPolicy[M] = + RetryPolicy[M](decideNextRetry = retryStatus => M.pure(f(retryStatus))) + + def withShow[M[_]]( + decideNextRetry: RetryStatus => M[PolicyDecision], + pretty: => String + ): RetryPolicy[M] = + new RetryPolicy[M](decideNextRetry) { + override def show: String = pretty + override def toString: String = pretty + } + + def liftWithShow[M[_]: Applicative]( + decideNextRetry: RetryStatus => PolicyDecision, + pretty: => String + ): RetryPolicy[M] = + withShow(rs => Applicative[M].pure(decideNextRetry(rs)), pretty) + + implicit def boundedSemilatticeForRetryPolicy[M[_]]( + implicit M: Applicative[M]): BoundedSemilattice[RetryPolicy[M]] = + new BoundedSemilattice[RetryPolicy[M]] { + override def empty: RetryPolicy[M] = + RetryPolicies.constantDelay[M](Duration.Zero) + + override def combine( + x: RetryPolicy[M], + y: RetryPolicy[M] + ): RetryPolicy[M] = x.join(y) + } + + implicit def showForRetryPolicy[M[_]]: Show[RetryPolicy[M]] = + Show.show(_.show) +} + +object RetryPolicies { + private val LongMax: BigInt = BigInt(Long.MaxValue) + + /* + * Multiply the given duration by the given multiplier, but cap the result to + * ensure we don't try to create a FiniteDuration longer than 2^63 - 1 nanoseconds. + * + * Note: despite the "safe" in the name, we can still create an invalid + * FiniteDuration if the multiplier is negative. But an assumption of the library + * as a whole is that nobody would be silly enough to use negative delays. + */ + private def safeMultiply( + duration: FiniteDuration, + multiplier: Long + ): FiniteDuration = { + val durationNanos = BigInt(duration.toNanos) + val resultNanos = durationNanos * BigInt(multiplier) + val safeResultNanos = resultNanos min LongMax + FiniteDuration(safeResultNanos.toLong, TimeUnit.NANOSECONDS) + } + + /** + * Don't retry at all and always give up. Only really useful for combining with other + * policies. + */ + def alwaysGiveUp[M[_]: Applicative]: RetryPolicy[M] = + RetryPolicy.liftWithShow(Function.const(GiveUp), "alwaysGiveUp") + + /** + * Delay by a constant amount before each retry. Never give up. + */ + def constantDelay[M[_]: Applicative](delay: FiniteDuration): RetryPolicy[M] = + RetryPolicy.liftWithShow( + Function.const(DelayAndRetry(delay)), + show"constantDelay($delay)" + ) + + /** + * Each delay is twice as long as the previous one. Never give up. + */ + def exponentialBackoff[M[_]: Applicative]( + baseDelay: FiniteDuration + ): RetryPolicy[M] = + RetryPolicy.liftWithShow( + { status => + val delay = + safeMultiply(baseDelay, Math.pow(2, status.retriesSoFar).toLong) + DelayAndRetry(delay) + }, + show"exponentialBackOff(baseDelay=$baseDelay)" + ) + + /** + * Retry without delay, giving up after the given number of retries. + */ + def limitRetries[M[_]: Applicative](maxRetries: Int): RetryPolicy[M] = + RetryPolicy.liftWithShow( + { status => + if (status.retriesSoFar >= maxRetries) { + GiveUp + } else { + DelayAndRetry(Duration.Zero) + } + }, + show"limitRetries(maxRetries=$maxRetries)" + ) + + /** + * Delay(n) = Delay(n - 2) + Delay(n - 1) + * + * e.g. if `baseDelay` is 10 milliseconds, the delays before each retry will be 10 ms, 10 ms, + * 20 ms, 30ms, 50ms, 80ms, 130ms, ... + */ + def fibonacciBackoff[M[_]: Applicative]( + baseDelay: FiniteDuration + ): RetryPolicy[M] = + RetryPolicy.liftWithShow( + { status => + val delay = + safeMultiply(baseDelay, Fibonacci.fibonacci(status.retriesSoFar + 1)) + DelayAndRetry(delay) + }, + show"fibonacciBackoff(baseDelay=$baseDelay)" + ) + + /** + * "Full jitter" backoff algorithm. See + * https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + */ + def fullJitter[M[_]: Applicative](baseDelay: FiniteDuration): RetryPolicy[M] = + RetryPolicy.liftWithShow( + { status => + val e = Math.pow(2, status.retriesSoFar).toLong + val maxDelay = safeMultiply(baseDelay, e) + val delayNanos = (maxDelay.toNanos * Random.nextDouble()).toLong + DelayAndRetry(new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS)) + }, + show"fullJitter(baseDelay=$baseDelay)" + ) + + /** + * Set an upper bound on any individual delay produced by the given policy. + */ + def capDelay[M[_]: Applicative]( + cap: FiniteDuration, + policy: RetryPolicy[M] + ): RetryPolicy[M] = + policy.meet(constantDelay(cap)) + + /** + * Add an upper bound to a policy such that once the given time-delay amount per try + * has been reached or exceeded, the policy will stop retrying and give up. If you need to + * stop retrying once cumulative delay reaches a time-delay amount, use + * [[limitRetriesByCumulativeDelay]]. + */ + def limitRetriesByDelay[M[_]: Applicative]( + threshold: FiniteDuration, + policy: RetryPolicy[M] + ): RetryPolicy[M] = { + def decideNextRetry(status: RetryStatus): M[PolicyDecision] = + policy.decideNextRetry(status).map { + case r @ DelayAndRetry(delay) => + if (delay > threshold) GiveUp else r + case GiveUp => GiveUp + } + + RetryPolicy.withShow[M]( + decideNextRetry, + show"limitRetriesByDelay(threshold=$threshold, $policy)" + ) + } + + /** + * Add an upperbound to a policy such that once the cumulative delay over all retries has + * reached or exceeded the given limit, the policy will stop retrying and give up. + */ + def limitRetriesByCumulativeDelay[M[_]: Applicative]( + threshold: FiniteDuration, + policy: RetryPolicy[M] + ): RetryPolicy[M] = { + def decideNextRetry(status: RetryStatus): M[PolicyDecision] = + policy.decideNextRetry(status).map { + case r @ DelayAndRetry(delay) => + if (status.cumulativeDelay + delay >= threshold) GiveUp else r + case GiveUp => GiveUp + } + + RetryPolicy.withShow[M]( + decideNextRetry, + show"limitRetriesByCumulativeDelay(threshold=$threshold, $policy)" + ) + } +} + +trait Sleep[M[_]] { + def sleep(delay: FiniteDuration): M[Unit] +} + +object Sleep { + def apply[M[_]](implicit sleep: Sleep[M]): Sleep[M] = sleep + + implicit def sleepUsingTemporal[F[_]](implicit t: Temporal[F]): Sleep[F] = + (delay: FiniteDuration) => t.sleep(delay) +} + +object implicits extends syntax.AllSyntax + +sealed trait RetryDetails { + def retriesSoFar: Int + def cumulativeDelay: FiniteDuration + def givingUp: Boolean + def upcomingDelay: Option[FiniteDuration] +} + +object RetryDetails { + final case class GivingUp( + totalRetries: Int, + totalDelay: FiniteDuration + ) extends RetryDetails { + val retriesSoFar: Int = totalRetries + val cumulativeDelay: FiniteDuration = totalDelay + val givingUp: Boolean = true + val upcomingDelay: Option[FiniteDuration] = None + } + + final case class WillDelayAndRetry( + nextDelay: FiniteDuration, + retriesSoFar: Int, + cumulativeDelay: FiniteDuration + ) extends RetryDetails { + val givingUp: Boolean = false + val upcomingDelay: Option[FiniteDuration] = Some(nextDelay) + } +} + +package object retry_ { + @deprecated("Use retryingOnFailures instead", "2.1.0") + def retryingM[A] = new RetryingOnFailuresPartiallyApplied[A] + def retryingOnFailures[A] = new RetryingOnFailuresPartiallyApplied[A] + + private def retryingOnFailuresImpl[M[_], A]( + policy: RetryPolicy[M], + wasSuccessful: A => M[Boolean], + onFailure: (A, RetryDetails) => M[Unit], + status: RetryStatus, + a: A + )(implicit M: Monad[M], S: Sleep[M]): M[Either[RetryStatus, A]] = { + + def onFalse: M[Either[RetryStatus, A]] = for { + nextStep <- applyPolicy(policy, status) + _ <- onFailure(a, buildRetryDetails(status, nextStep)) + result <- nextStep match { + case NextStep.RetryAfterDelay(delay, updatedStatus) => + S.sleep(delay) *> + M.pure(Left(updatedStatus)) // continue recursion + case NextStep.GiveUp => + M.pure(Right(a)) // stop the recursion + } + } yield result + + wasSuccessful(a).ifM( + M.pure(Right(a)), // stop the recursion + onFalse + ) + } + + private[retry] class RetryingOnFailuresPartiallyApplied[A] { + def apply[M[_]]( + policy: RetryPolicy[M], + wasSuccessful: A => M[Boolean], + onFailure: (A, RetryDetails) => M[Unit] + )( + action: => M[A] + )(implicit M: Monad[M], S: Sleep[M]): M[A] = M.tailRecM(RetryStatus.NoRetriesYet) { + status => + action.flatMap { a => + retryingOnFailuresImpl(policy, wasSuccessful, onFailure, status, a) + } + } + } + + def retryingOnSomeErrors[A] = new RetryingOnSomeErrorsPartiallyApplied[A] + + private def retryingOnSomeErrorsImpl[M[_], A, E]( + policy: RetryPolicy[M], + isWorthRetrying: E => M[Boolean], + onError: (E, RetryDetails) => M[Unit], + status: RetryStatus, + attempt: Either[E, A] + )(implicit ME: MonadError[M, E], S: Sleep[M]): M[Either[RetryStatus, A]] = attempt match { + case Left(error) => + isWorthRetrying(error).ifM( + for { + nextStep <- applyPolicy(policy, status) + _ <- onError(error, buildRetryDetails(status, nextStep)) + result <- nextStep match { + case NextStep.RetryAfterDelay(delay, updatedStatus) => + S.sleep(delay) *> + ME.pure(Left(updatedStatus)) // continue recursion + case NextStep.GiveUp => + ME.raiseError[A](error).map(Right(_)) // stop the recursion + } + } yield result, + ME.raiseError[A](error).map(Right(_)) // stop the recursion + ) + case Right(success) => + ME.pure(Right(success)) // stop the recursion + } + + private[retry] class RetryingOnSomeErrorsPartiallyApplied[A] { + def apply[M[_], E]( + policy: RetryPolicy[M], + isWorthRetrying: E => M[Boolean], + onError: (E, RetryDetails) => M[Unit] + )( + action: => M[A] + )(implicit ME: MonadError[M, E], S: Sleep[M]): M[A] = + ME.tailRecM(RetryStatus.NoRetriesYet) { status => + ME.attempt(action).flatMap { attempt => + retryingOnSomeErrorsImpl( + policy, + isWorthRetrying, + onError, + status, + attempt + ) + } + } + } + + def retryingOnAllErrors[A] = new RetryingOnAllErrorsPartiallyApplied[A] + + private[retry] class RetryingOnAllErrorsPartiallyApplied[A] { + def apply[M[_], E]( + policy: RetryPolicy[M], + onError: (E, RetryDetails) => M[Unit] + )( + action: => M[A] + )(implicit ME: MonadError[M, E], S: Sleep[M]): M[A] = + retryingOnSomeErrors[A].apply[M, E](policy, _ => ME.pure(true), onError)( + action + ) + } + + def retryingOnFailuresAndSomeErrors[A] = + new RetryingOnFailuresAndSomeErrorsPartiallyApplied[A] + + private[retry] class RetryingOnFailuresAndSomeErrorsPartiallyApplied[A] { + def apply[M[_], E]( + policy: RetryPolicy[M], + wasSuccessful: A => M[Boolean], + isWorthRetrying: E => M[Boolean], + onFailure: (A, RetryDetails) => M[Unit], + onError: (E, RetryDetails) => M[Unit] + )( + action: => M[A] + )(implicit ME: MonadError[M, E], S: Sleep[M]): M[A] = { + + ME.tailRecM(RetryStatus.NoRetriesYet) { status => + ME.attempt(action).flatMap { + case Right(a) => + retryingOnFailuresImpl(policy, wasSuccessful, onFailure, status, a) + case attempt => + retryingOnSomeErrorsImpl( + policy, + isWorthRetrying, + onError, + status, + attempt + ) + } + } + } + } + + def retryingOnFailuresAndAllErrors[A] = + new RetryingOnFailuresAndAllErrorsPartiallyApplied[A] + + private[retry] class RetryingOnFailuresAndAllErrorsPartiallyApplied[A] { + def apply[M[_], E]( + policy: RetryPolicy[M], + wasSuccessful: A => M[Boolean], + onFailure: (A, RetryDetails) => M[Unit], + onError: (E, RetryDetails) => M[Unit] + )( + action: => M[A] + )(implicit ME: MonadError[M, E], S: Sleep[M]): M[A] = + retryingOnFailuresAndSomeErrors[A].apply[M, E]( + policy, + wasSuccessful, + _ => ME.pure(true), + onFailure, + onError + )( + action + ) + } + + def noop[M[_]: Monad, A]: (A, RetryDetails) => M[Unit] = + (_, _) => Monad[M].pure(()) + + private[retry] def applyPolicy[M[_]: Monad]( + policy: RetryPolicy[M], + retryStatus: RetryStatus + ): M[NextStep] = + policy.decideNextRetry(retryStatus).map { + case PolicyDecision.DelayAndRetry(delay) => + NextStep.RetryAfterDelay(delay, retryStatus.addRetry(delay)) + case PolicyDecision.GiveUp => + NextStep.GiveUp + } + + private[retry] def buildRetryDetails( + currentStatus: RetryStatus, + nextStep: NextStep + ): RetryDetails = + nextStep match { + case NextStep.RetryAfterDelay(delay, _) => + RetryDetails.WillDelayAndRetry( + delay, + currentStatus.retriesSoFar, + currentStatus.cumulativeDelay + ) + case NextStep.GiveUp => + RetryDetails.GivingUp( + currentStatus.retriesSoFar, + currentStatus.cumulativeDelay + ) + } + + private[retry] sealed trait NextStep + + private[retry] object NextStep { + case object GiveUp extends NextStep + + final case class RetryAfterDelay( + delay: FiniteDuration, + updatedStatus: RetryStatus + ) extends NextStep + } +} + +trait AllSyntax extends RetrySyntax + +trait RetrySyntax { + implicit final def retrySyntaxBase[M[_], A]( + action: => M[A] + ): RetryingOps[M, A] = + new RetryingOps[M, A](action) + + implicit final def retrySyntaxError[M[_], A, E]( + action: => M[A] + )(implicit M: MonadError[M, E]): RetryingErrorOps[M, A, E] = + new RetryingErrorOps[M, A, E](action) +} + +final class RetryingOps[M[_], A](action: => M[A]) { + @deprecated("Use retryingOnFailures instead", "2.1.0") + def retryingM[E]( + wasSuccessful: A => M[Boolean], + policy: RetryPolicy[M], + onFailure: (A, RetryDetails) => M[Unit] + )(implicit M: Monad[M], S: Sleep[M]): M[A] = + retryingOnFailures(wasSuccessful, policy, onFailure) + + def retryingOnFailures[E]( + wasSuccessful: A => M[Boolean], + policy: RetryPolicy[M], + onFailure: (A, RetryDetails) => M[Unit] + )(implicit M: Monad[M], S: Sleep[M]): M[A] = + retry_.retryingOnFailures( + policy = policy, + wasSuccessful = wasSuccessful, + onFailure = onFailure + )(action) +} + +final class RetryingErrorOps[M[_], A, E](action: => M[A])(implicit M: MonadError[M, E]) { + def retryingOnAllErrors( + policy: RetryPolicy[M], + onError: (E, RetryDetails) => M[Unit] + )(implicit S: Sleep[M]): M[A] = + retry_.retryingOnAllErrors( + policy = policy, + onError = onError + )(action) + + def retryingOnSomeErrors( + isWorthRetrying: E => M[Boolean], + policy: RetryPolicy[M], + onError: (E, RetryDetails) => M[Unit] + )(implicit S: Sleep[M]): M[A] = + retry_.retryingOnSomeErrors( + policy = policy, + isWorthRetrying = isWorthRetrying, + onError = onError + )(action) + + def retryingOnFailuresAndAllErrors( + wasSuccessful: A => M[Boolean], + policy: RetryPolicy[M], + onFailure: (A, RetryDetails) => M[Unit], + onError: (E, RetryDetails) => M[Unit] + )(implicit S: Sleep[M]): M[A] = + retry_.retryingOnFailuresAndAllErrors( + policy = policy, + wasSuccessful = wasSuccessful, + onFailure = onFailure, + onError = onError + )(action) + + def retryingOnFailuresAndSomeErrors( + wasSuccessful: A => M[Boolean], + isWorthRetrying: E => M[Boolean], + policy: RetryPolicy[M], + onFailure: (A, RetryDetails) => M[Unit], + onError: (E, RetryDetails) => M[Unit] + )(implicit S: Sleep[M]): M[A] = + retry_.retryingOnFailuresAndSomeErrors( + policy = policy, + wasSuccessful = wasSuccessful, + isWorthRetrying = isWorthRetrying, + onFailure = onFailure, + onError = onError + )(action) +} + +object Fibonacci { + def fibonacci(n: Int): Long = { + if (n > 0) + fib(n)._1 + else + 0 + } + + // "Fast doubling" Fibonacci algorithm. + // See e.g. http://funloop.org/post/2017-04-14-computing-fibonacci-numbers.html for explanation. + private def fib(n: Int): (Long, Long) = n match { + case 0 => (0, 1) + case m => + val (a, b) = fib(m / 2) + val c = a * (b * 2 - a) + val d = a * a + b * b + if (n % 2 == 0) + (c, d) + else + (d, c + d) + } +} + + diff --git a/std/shared/src/main/scala/cats/effect/std/Retry.scala b/std/shared/src/main/scala/cats/effect/std/Retry.scala new file mode 100644 index 0000000000..24e39c03e3 --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/Retry.scala @@ -0,0 +1,305 @@ +package cats.effect.std + +import cats._ +import cats.syntax.all._ +import cats.effect.kernel.Temporal +import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.util.Random +import java.util.concurrent.TimeUnit + +abstract class Retry[F[_]] { + def nextRetry(status: Retry.Status, error: Throwable): F[Retry.Decision] + + def followedBy(r: Retry[F]): Retry[F] + + /** + * Combine this schedule with another schedule, retrying when both schedules want to retry + * and choosing the maximum of the two delays. + */ + def and(r: Retry[F]): Retry[F] + + /** + * Combine this schedule with another schedule, retrying when either schedule wants to retry + * and choosing the minimum of the two delays when both schedules want to retry. + */ + def or(r: Retry[F]): Retry[F] + + def mapDelay(f: FiniteDuration => FiniteDuration): Retry[F] + + def flatMapDelay(f: FiniteDuration => F[FiniteDuration]): Retry[F] + + /** + * Set an upper bound on any individual delay produced by the given Retry. + */ + def capDelay(cap: FiniteDuration): Retry[F] + + def limitRetries(maxRetries: Int): Retry[F] + + /** + * Add an upper bound to a Retry such that once the given time-delay amount per try + * has been reached or exceeded, the Retry will stop retrying and give up. If you need to + * stop retrying once cumulative delay reaches a time-delay amount, use + * [[limitRetriesByCumulativeDelay]]. + */ + def limitRetriesByDelay(threshold: FiniteDuration): Retry[F] + + /** + * Add an upperbound to a Retry such that once the cumulative delay over all retries has + * reached or exceeded the given limit, the Retry will stop retrying and give up. + */ + def limitRetriesByCumulativeDelay(threshold: FiniteDuration): Retry[F] + + def selectError(f: Throwable => Boolean): Retry[F] + + def selectErrorWith(f: Throwable => F[Boolean]): Retry[F] + + def flatTap(f: (Throwable, Retry.Decision, Retry.Status) => F[Unit]): Retry[F] + + def mapK[G[_]: Monad](f: F ~> G): Retry[G] +} +object Retry { + final case class Status( + retriesSoFar: Int, + cumulativeDelay: FiniteDuration, + previousDelay: Option[FiniteDuration] + ) { + def addRetry(delay: FiniteDuration) = Retry.Status( + retriesSoFar = retriesSoFar + 1, + cumulativeDelay = cumulativeDelay + delay, + previousDelay = delay.some + ) + } + + sealed trait Decision + object Decision { + case object GiveUp extends Decision + final case class DelayAndRetry(delay: FiniteDuration) extends Decision + } + + import Decision._ + + val noRetriesYet = Retry.Status(0, Duration.Zero, None) + + def retry[F[_]: Temporal, A](r: Retry[F], action: F[A]): F[A] = { + def loop(status: Retry.Status): F[A] = + action.handleErrorWith { error => + r + .nextRetry(status, error) + .flatMap { + case DelayAndRetry(delay) => + Temporal[F].sleep(delay) >> loop(status.addRetry(delay)) + case GiveUp => + Temporal[F].raiseError[A](error) + } + } + + loop(noRetriesYet) + } + + def apply[F[_]: Monad]( + nextRetry: (Retry.Status, Throwable) => F[Retry.Decision] + ): Retry[F] = + new RetryImpl[F](nextRetry) + + def liftF[F[_]: Monad]( + nextRetry: Retry.Status => F[Retry.Decision] + ): Retry[F] = + Retry((status, _) => nextRetry(status)) + + def lift[F[_]: Monad]( + nextRetry: Retry.Status => Retry.Decision + ): Retry[F] = + liftF[F](status => nextRetry(status).pure[F]) + + /** + * Don't retry at all and always give up. Only really useful for combining with other + * policies. + */ + def alwaysGiveUp[F[_]: Monad]: Retry[F] = + Retry.lift[F](_ => GiveUp) + + /** + * Delay by a constant amount before each retry. Never give up. + */ + def constantDelay[F[_]: Monad](delay: FiniteDuration): Retry[F] = + Retry.lift[F](_ => DelayAndRetry(delay)) + + /** + * Each delay is twice as long as the previous one. Never give up. + */ + def exponentialBackoff[F[_]: Monad]( + baseDelay: FiniteDuration + ): Retry[F] = + Retry.lift[F] { status => + val delay = safeMultiply(baseDelay, Math.pow(2, status.retriesSoFar.toDouble).toLong) + DelayAndRetry(delay) + } + + /** + * Delay(n) = Delay(n - 2) + Delay(n - 1) + * + * e.g. if `baseDelay` is 10 milliseconds, the delays before each retry will be 10 ms, 10 ms, + * 20 ms, 30ms, 50ms, 80ms, 130ms, ... + */ + def fibonacciBackoff[F[_]: Monad]( + baseDelay: FiniteDuration + ): Retry[F] = { + // "Fast doubling" Fibonacci algorithm. + // See e.g. http://funloop.org/post/2017-04-14-computing-fibonacci-numbers.html for explanation. + def fib(n: Int): (Long, Long) = n match { + case 0 => (0, 1) + case m => + val (a, b) = fib(m / 2) + val c = a * (b * 2 - a) + val d = a * a + b * b + if (n % 2 == 0) + (c, d) + else + (d, c + d) + } + + // TODO + // this can probably be eliminated (after tests) since it only + // exists for that > 0 test, which is a condition that cannot + // happen at use site + def fibonacci(n: Int): Long = { + if (n > 0) + fib(n)._1 + else + 0 + } + + Retry.lift[F] { status => + val delay = + safeMultiply(baseDelay, fibonacci(status.retriesSoFar + 1)) + DelayAndRetry(delay) + } + } + + /** + * "Full jitter" backoff algorithm. See + * https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + */ + def fullJitter[F[_]: Monad](baseDelay: FiniteDuration): Retry[F] = + Retry.lift[F] { status => + val e = Math.pow(2, status.retriesSoFar.toDouble).toLong + val maxDelay = safeMultiply(baseDelay, e) + val delayNanos = (maxDelay.toNanos * Random.nextDouble()).toLong + DelayAndRetry(new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS)) + } + + /* + * Multiply the given duration by the given multiplier, but cap the result to + * ensure we don't try to create a FiniteDuration longer than 2^63 - 1 nanoseconds. + */ + private def safeMultiply( + duration: FiniteDuration, + multiplier: Long + ): FiniteDuration = { + assert(multiplier > 0, "Don't use a negative delay") + val longMax: BigInt = BigInt(Long.MaxValue) + val durationNanos = BigInt(duration.toNanos) + val resultNanos = durationNanos * BigInt(multiplier) + val safeResultNanos = resultNanos min longMax + FiniteDuration(safeResultNanos.toLong, TimeUnit.NANOSECONDS) + } + + private final case class RetryImpl[F[_]: Monad](nextRetry_ : (Retry.Status, Throwable) => F[Retry.Decision]) extends Retry[F] { + + def nextRetry(status: Retry.Status, error: Throwable): F[Retry.Decision] = + nextRetry_(status, error) + + def followedBy(r: Retry[F]) = Retry { (status, error) => + (nextRetry(status, error), r.nextRetry(status, error)).mapN { + case (GiveUp, decision) => decision + case (decision, _) => decision + } + } + + def and(r: Retry[F]) = Retry[F] { (status, error) => + (nextRetry(status, error), r.nextRetry(status, error)).mapN { + case (DelayAndRetry(t1), DelayAndRetry(t2)) => DelayAndRetry(t1 max t2) + case _ => GiveUp + } + } + + + def or(r: Retry[F]) = Retry { (status, error) => + (nextRetry(status, error), r.nextRetry(status, error)).mapN { + case (DelayAndRetry(t1), DelayAndRetry(t2)) => DelayAndRetry(t1 min t2) + case (retrying @ DelayAndRetry(_), GiveUp) => retrying + case (GiveUp, retrying @ DelayAndRetry(_)) => retrying + case _ => GiveUp + } + } + + def mapDelay(f: FiniteDuration => FiniteDuration) = Retry { (status, error) => + nextRetry(status, error).map { + case GiveUp => GiveUp + case DelayAndRetry(delay) => DelayAndRetry(f(delay)) + } + } + + def flatMapDelay(f: FiniteDuration => F[FiniteDuration]) = Retry { (status, error) => + nextRetry(status, error).flatMap { + case GiveUp => GiveUp.pure[F].widen[Retry.Decision] + case DelayAndRetry(delay) => f(delay).map(DelayAndRetry(_)) + } + } + + def capDelay(cap: FiniteDuration): Retry[F] = + mapDelay(delay => delay.min(cap)) + + def limitRetries(maxRetries: Int): Retry[F] = Retry { (status, error) => + if (status.retriesSoFar >= maxRetries) GiveUp.pure[F].widen[Decision] + else nextRetry(status, error) + } + + def limitRetriesByDelay(threshold: FiniteDuration) = Retry { (status, error) => + nextRetry(status, error).map { + case retrying @ DelayAndRetry(delay) => + if (delay > threshold) GiveUp else retrying + case GiveUp => GiveUp + } + } + + def limitRetriesByCumulativeDelay(threshold: FiniteDuration) = + Retry { (status, error) => + nextRetry(status, error).map { + case retrying @ DelayAndRetry(delay) => + if (status.cumulativeDelay + delay >= threshold) GiveUp + else retrying + case GiveUp => GiveUp + } + } + + def selectError(f: Throwable => Boolean): Retry[F] = + Retry { (status, error) => + if(f(error)) nextRetry(status, error) + else GiveUp.pure[F].widen[Decision] + } + + def selectErrorWith(f: Throwable => F[Boolean]): Retry[F] = + Retry { (status, error) => + f(error).flatMap { + case true => nextRetry(status, error) + case false => GiveUp.pure[F].widen[Decision] + } + } + + def flatTap(f: (Throwable, Retry.Decision, Retry.Status) => F[Unit]): Retry[F] = + Retry { (status, error) => + nextRetry(status, error).flatTap { + case decision @ DelayAndRetry(delay) => + f(error, decision, status.addRetry(delay)) + case decision @ GiveUp => + f(error, decision, status) + } + } + + def mapK[G[_]: Monad](f: F ~> G): Retry[G] = + Retry((status, error) => f(nextRetry(status, error))) + + override def toString: String = "Retry(...)" + } + } diff --git a/std/shared/src/main/scala/cats/effect/std/syntax/AllSyntax.scala b/std/shared/src/main/scala/cats/effect/std/syntax/AllSyntax.scala index 2636bab87f..026e36f570 100644 --- a/std/shared/src/main/scala/cats/effect/std/syntax/AllSyntax.scala +++ b/std/shared/src/main/scala/cats/effect/std/syntax/AllSyntax.scala @@ -17,4 +17,4 @@ package cats.effect package std.syntax -trait AllSyntax extends BackpressureSyntax with SupervisorSyntax +trait AllSyntax extends BackpressureSyntax with SupervisorSyntax with RetrySyntax diff --git a/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala b/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala new file mode 100644 index 0000000000..a9315c5b92 --- /dev/null +++ b/std/shared/src/main/scala/cats/effect/std/syntax/RetrySyntax.scala @@ -0,0 +1,15 @@ +package cats.effect.std.syntax + +import cats.effect.kernel.Temporal +import cats.effect.std.Retry + +trait RetrySyntax { + implicit def retryOps[F[_], A](wrapped: F[A]): RetryOps[F, A] = + new RetryOps(wrapped) +} + +final class RetryOps[F[_], A] private[syntax] (private[syntax] val wrapped: F[A]) + extends AnyVal { + def retry(r: Retry[F])(implicit F: Temporal[F]): F[A] = + Retry.retry(r, wrapped) +} diff --git a/std/shared/src/main/scala/cats/effect/std/syntax/package.scala b/std/shared/src/main/scala/cats/effect/std/syntax/package.scala index 8016cca34e..a495739430 100644 --- a/std/shared/src/main/scala/cats/effect/std/syntax/package.scala +++ b/std/shared/src/main/scala/cats/effect/std/syntax/package.scala @@ -22,4 +22,8 @@ package object syntax { object supervisor extends SupervisorSyntax + object retry extends RetrySyntax + + object backpressure extends BackpressureSyntax + } diff --git a/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala b/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala new file mode 100644 index 0000000000..784cd36a44 --- /dev/null +++ b/tests/shared/src/test/scala/cats/effect/std/RetrySpec.scala @@ -0,0 +1,971 @@ +// package cats.effect.std + +// import cats.{Id, catsInstancesForId} +// import org.scalatest.flatspec.AnyFlatSpec +// import retry.syntax.all._ + +// import scala.collection.mutable.ArrayBuffer +// import scala.concurrent.duration._ + +// class SyntaxSpec extends AnyFlatSpec { +// type StringOr[A] = Either[String, A] + +// behavior of "retryingOnFailures" + +// it should "retry until the action succeeds" in new TestContext { +// val policy: RetryPolicy[Id] = +// RetryPolicies.constantDelay[Id](1.second) +// def onFailure: (String, RetryDetails) => Id[Unit] = onError +// def wasSuccessful(res: String): Id[Boolean] = res.toInt > 3 +// val sleeps = ArrayBuffer.empty[FiniteDuration] + +// implicit val dummySleep: Sleep[Id] = +// (delay: FiniteDuration) => sleeps.append(delay) + +// def action: Id[String] = { +// attempts = attempts + 1 +// attempts.toString +// } + +// val finalResult: Id[String] = +// action.retryingOnFailures(wasSuccessful, policy, onFailure) + +// assert(finalResult == "4") +// assert(attempts == 4) +// assert(errors.toList == List("1", "2", "3")) +// assert(delays.toList == List(1.second, 1.second, 1.second)) +// assert(sleeps.toList == delays.toList) +// assert(!gaveUp) +// } + +// it should "retry until the policy chooses to give up" in new TestContext { +// val policy: RetryPolicy[Id] = RetryPolicies.limitRetries[Id](2) +// implicit val dummySleep: Sleep[Id] = _ => () + +// def action: Id[String] = { +// attempts = attempts + 1 +// attempts.toString +// } + +// val finalResult: Id[String] = +// action.retryingOnFailures(_.toInt > 3, policy, onError) + +// assert(finalResult == "3") +// assert(attempts == 3) +// assert(errors.toList == List("1", "2", "3")) +// assert(delays.toList == List(Duration.Zero, Duration.Zero)) +// assert(gaveUp) +// } + +// behavior of "retryingOnSomeErrors" + +// it should "retry until the action succeeds" in new TestContext { +// implicit val sleepForEither: Sleep[StringOr] = _ => Right(()) + +// val policy: RetryPolicy[StringOr] = +// RetryPolicies.constantDelay[StringOr](1.second) + +// def action: StringOr[String] = { +// attempts = attempts + 1 +// if (attempts < 3) +// Left("one more time") +// else +// Right("yay") +// } + +// val finalResult: StringOr[String] = +// action.retryingOnSomeErrors( +// s => Right(s == "one more time"), +// policy, +// (err, rd) => onError(err, rd) +// ) + +// assert(finalResult == Right("yay")) +// assert(attempts == 3) +// assert(errors.toList == List("one more time", "one more time")) +// assert(!gaveUp) +// } + +// it should "retry only if the error is worth retrying" in new TestContext { +// implicit val sleepForEither: Sleep[StringOr] = _ => Right(()) + +// val policy: RetryPolicy[StringOr] = +// RetryPolicies.constantDelay[StringOr](1.second) + +// def action: StringOr[Nothing] = { +// attempts = attempts + 1 +// if (attempts < 3) +// Left("one more time") +// else +// Left("nope") +// } + +// val finalResult = +// action.retryingOnSomeErrors( +// s => Right(s == "one more time"), +// policy, +// (err, rd) => onError(err, rd) +// ) + +// assert(finalResult == Left("nope")) +// assert(attempts == 3) +// assert(errors.toList == List("one more time", "one more time")) +// assert( +// !gaveUp +// ) // false because onError is only called when the error is worth retrying +// } + +// it should "retry until the policy chooses to give up" in new TestContext { +// implicit val sleepForEither: Sleep[StringOr] = _ => Right(()) + +// val policy: RetryPolicy[StringOr] = +// RetryPolicies.limitRetries[StringOr](2) + +// def action: StringOr[Nothing] = { +// attempts = attempts + 1 + +// Left("one more time") +// } + +// val finalResult: StringOr[Nothing] = +// action.retryingOnSomeErrors( +// s => Right(s == "one more time"), +// policy, +// (err, rd) => onError(err, rd) +// ) + +// assert(finalResult == Left("one more time")) +// assert(attempts == 3) +// assert( +// errors.toList == List("one more time", "one more time", "one more time") +// ) +// assert(gaveUp) +// } + +// behavior of "retryingOnAllErrors" + +// it should "retry until the action succeeds" in new TestContext { +// implicit val sleepForEither: Sleep[StringOr] = _ => Right(()) + +// val policy: RetryPolicy[StringOr] = +// RetryPolicies.constantDelay[StringOr](1.second) + +// def action: StringOr[String] = { +// attempts = attempts + 1 +// if (attempts < 3) +// Left("one more time") +// else +// Right("yay") +// } + +// val finalResult: StringOr[String] = +// action.retryingOnAllErrors(policy, (err, rd) => onError(err, rd)) + +// assert(finalResult == Right("yay")) +// assert(attempts == 3) +// assert(errors.toList == List("one more time", "one more time")) +// assert(!gaveUp) +// } + +// it should "retry until the policy chooses to give up" in new TestContext { +// implicit val sleepForEither: Sleep[StringOr] = _ => Right(()) + +// val policy: RetryPolicy[StringOr] = +// RetryPolicies.limitRetries[StringOr](2) + +// def action: StringOr[Nothing] = { +// attempts = attempts + 1 +// Left("one more time") +// } + +// val finalResult = +// action.retryingOnAllErrors(policy, (err, rd) => onError(err, rd)) + +// assert(finalResult == Left("one more time")) +// assert(attempts == 3) +// assert( +// errors.toList == List("one more time", "one more time", "one more time") +// ) +// assert(gaveUp) +// } + +// private class TestContext { +// var attempts = 0 +// val errors = ArrayBuffer.empty[String] +// val delays = ArrayBuffer.empty[FiniteDuration] +// var gaveUp = false + +// def onError(error: String, details: RetryDetails): Either[String, Unit] = { +// errors.append(error) +// details match { +// case RetryDetails.WillDelayAndRetry(delay, _, _) => delays.append(delay) +// case RetryDetails.GivingUp(_, _) => gaveUp = true +// } +// Right(()) +// } +// } +// } + +// import org.scalatest.flatspec.AnyFlatSpec + +// class FibonacciSpec extends AnyFlatSpec { +// it should "calculate the Fibonacci sequence" in { +// assert(Fibonacci.fibonacci(0) == 0) +// assert(Fibonacci.fibonacci(1) == 1) +// assert(Fibonacci.fibonacci(2) == 1) +// assert(Fibonacci.fibonacci(3) == 2) +// assert(Fibonacci.fibonacci(4) == 3) +// assert(Fibonacci.fibonacci(5) == 5) +// assert(Fibonacci.fibonacci(6) == 8) +// assert(Fibonacci.fibonacci(7) == 13) +// assert(Fibonacci.fibonacci(75) == 2111485077978050L) +// } +// } + +// import cats.{Id, catsInstancesForId} +// import org.scalatest.flatspec.AnyFlatSpec + +// import scala.collection.mutable.ArrayBuffer +// import scala.concurrent.duration._ + +// class PackageObjectSpec extends AnyFlatSpec { +// type StringOr[A] = Either[String, A] + +// implicit val sleepForEither: Sleep[StringOr] = _ => Right(()) + +// behavior of "retryingOnFailures" + +// it should "retry until the action succeeds" in new TestContext { +// val policy = RetryPolicies.constantDelay[Id](1.second) + +// val sleeps = ArrayBuffer.empty[FiniteDuration] + +// implicit val dummySleep: Sleep[Id] = +// (delay: FiniteDuration) => sleeps.append(delay) + +// val finalResult = retryingOnFailures[String][Id]( +// policy, +// _.toInt > 3, +// onError +// ) { +// attempts = attempts + 1 +// attempts.toString +// } + +// assert(finalResult == "4") +// assert(attempts == 4) +// assert(errors.toList == List("1", "2", "3")) +// assert(delays.toList == List(1.second, 1.second, 1.second)) +// assert(sleeps.toList == delays.toList) +// assert(!gaveUp) +// } + +// it should "retry until the policy chooses to give up" in new TestContext { +// val policy = RetryPolicies.limitRetries[Id](2) + +// implicit val dummySleep: Sleep[Id] = _ => () + +// val finalResult = retryingOnFailures[String][Id]( +// policy, +// _.toInt > 3, +// onError +// ) { +// attempts = attempts + 1 +// attempts.toString +// } + +// assert(finalResult == "3") +// assert(attempts == 3) +// assert(errors.toList == List("1", "2", "3")) +// assert(delays.toList == List(Duration.Zero, Duration.Zero)) +// assert(gaveUp) +// } + +// it should "retry in a stack-safe way" in new TestContext { +// val policy = RetryPolicies.limitRetries[Id](10000) + +// implicit val dummySleep: Sleep[Id] = _ => () + +// val finalResult = retryingOnFailures[String][Id]( +// policy, +// _.toInt > 20000, +// onError +// ) { +// attempts = attempts + 1 +// attempts.toString +// } + +// assert(finalResult == "10001") +// assert(attempts == 10001) +// assert(gaveUp) +// } + +// behavior of "retryingOnSomeErrors" + +// it should "retry until the action succeeds" in new TestContext { +// val policy = RetryPolicies.constantDelay[StringOr](1.second) + +// val finalResult = retryingOnSomeErrors( +// policy, +// (s: String) => Right(s == "one more time"), +// onError +// ) { +// attempts = attempts + 1 +// if (attempts < 3) +// Left("one more time") +// else +// Right("yay") +// } + +// assert(finalResult == Right("yay")) +// assert(attempts == 3) +// assert(errors.toList == List("one more time", "one more time")) +// assert(!gaveUp) +// } + +// it should "retry only if the error is worth retrying" in new TestContext { +// val policy = RetryPolicies.constantDelay[StringOr](1.second) + +// val finalResult = retryingOnSomeErrors( +// policy, +// (s: String) => Right(s == "one more time"), +// onError +// ) { +// attempts = attempts + 1 +// if (attempts < 3) +// Left("one more time") +// else +// Left("nope") +// } + +// assert(finalResult == Left("nope")) +// assert(attempts == 3) +// assert(errors.toList == List("one more time", "one more time")) +// assert( +// !gaveUp +// ) // false because onError is only called when the error is worth retrying +// } + +// it should "retry until the policy chooses to give up" in new TestContext { +// val policy = RetryPolicies.limitRetries[StringOr](2) + +// val finalResult = retryingOnSomeErrors( +// policy, +// (s: String) => Right(s == "one more time"), +// onError +// ) { +// attempts = attempts + 1 +// Left("one more time") +// } + +// assert(finalResult == Left("one more time")) +// assert(attempts == 3) +// assert( +// errors.toList == List("one more time", "one more time", "one more time") +// ) +// assert(gaveUp) +// } + +// it should "retry in a stack-safe way" in new TestContext { +// val policy = RetryPolicies.limitRetries[StringOr](10000) + +// val finalResult = retryingOnSomeErrors( +// policy, +// (s: String) => Right(s == "one more time"), +// onError +// ) { +// attempts = attempts + 1 +// Left("one more time") +// } + +// assert(finalResult == Left("one more time")) +// assert(attempts == 10001) +// assert(gaveUp) +// } + +// behavior of "retryingOnAllErrors" + +// it should "retry until the action succeeds" in new TestContext { +// val policy = RetryPolicies.constantDelay[StringOr](1.second) + +// val finalResult = retryingOnAllErrors( +// policy, +// onError +// ) { +// attempts = attempts + 1 +// if (attempts < 3) +// Left("one more time") +// else +// Right("yay") +// } + +// assert(finalResult == Right("yay")) +// assert(attempts == 3) +// assert(errors.toList == List("one more time", "one more time")) +// assert(!gaveUp) +// } + +// it should "retry until the policy chooses to give up" in new TestContext { +// val policy = RetryPolicies.limitRetries[StringOr](2) + +// val finalResult = retryingOnAllErrors( +// policy, +// onError +// ) { +// attempts = attempts + 1 +// Left("one more time") +// } + +// assert(finalResult == Left("one more time")) +// assert(attempts == 3) +// assert( +// errors.toList == List("one more time", "one more time", "one more time") +// ) +// assert(gaveUp) +// } + +// it should "retry in a stack-safe way" in new TestContext { +// val policy = RetryPolicies.limitRetries[StringOr](10000) + +// val finalResult = retryingOnAllErrors( +// policy, +// onError +// ) { +// attempts = attempts + 1 +// Left("one more time") +// } + +// assert(finalResult == Left("one more time")) +// assert(attempts == 10001) +// assert(gaveUp) +// } + +// behavior of "retryingOnFailuresAndSomeErrors" + +// it should "retry until the action succeeds" in new TestContext { +// val policy = RetryPolicies.constantDelay[StringOr](1.second) + +// val finalResult = retryingOnFailuresAndSomeErrors[String]( +// policy, +// s => Right(s == "yay"), +// (s: String) => Right(s == "one more time"), +// onError, +// onError +// ) { +// attempts = attempts + 1 +// if (attempts < 3) +// Left("one more time") +// else +// Right("yay") +// } + +// assert(finalResult == Right("yay")) +// assert(attempts == 3) +// assert(errors.toList == List("one more time", "one more time")) +// assert(!gaveUp) +// } + +// it should "retry only if the error is worth retrying" in new TestContext { +// val policy = RetryPolicies.constantDelay[StringOr](1.second) + +// val finalResult = retryingOnFailuresAndSomeErrors[String]( +// policy, +// s => Right(s == "will never happen"), +// (s: String) => Right(s == "one more time"), +// onError, +// onError +// ) { +// attempts = attempts + 1 +// if (attempts < 3) +// Left("one more time") +// else +// Left("nope") +// } + +// assert(finalResult == Left("nope")) +// assert(attempts == 3) +// assert(errors.toList == List("one more time", "one more time")) +// assert( +// !gaveUp +// ) // false because onError is only called when the error is worth retrying +// } + +// it should "retry until the policy chooses to give up due to errors" in new TestContext { +// val policy = RetryPolicies.limitRetries[StringOr](2) + +// val finalResult = retryingOnFailuresAndSomeErrors[String]( +// policy, +// s => Right(s == "will never happen"), +// (s: String) => Right(s == "one more time"), +// onError, +// onError +// ) { +// attempts = attempts + 1 +// Left("one more time") +// } + +// assert(finalResult == Left("one more time")) +// assert(attempts == 3) +// assert( +// errors.toList == List("one more time", "one more time", "one more time") +// ) +// assert(gaveUp) +// } + +// it should "retry until the policy chooses to give up due to failures" in new TestContext { +// val policy = RetryPolicies.limitRetries[StringOr](2) + +// val finalResult = retryingOnFailuresAndSomeErrors[String]( +// policy, +// s => Right(s == "yay"), +// (s: String) => Right(s == "one more time"), +// onError, +// onError +// ) { +// attempts = attempts + 1 +// Right("boo") +// } + +// assert(finalResult == Right("boo")) +// assert(attempts == 3) +// assert(errors.toList == List("boo", "boo", "boo")) +// assert(gaveUp) +// } + +// it should "retry in a stack-safe way" in new TestContext { +// val policy = RetryPolicies.limitRetries[StringOr](10000) + +// val finalResult = retryingOnFailuresAndSomeErrors[String]( +// policy, +// s => Right(s == "yay"), +// (s: String) => Right(s == "one more time"), +// onError, +// onError +// ) { +// attempts = attempts + 1 +// Left("one more time") +// } + +// assert(finalResult == Left("one more time")) +// assert(attempts == 10001) +// assert(gaveUp) +// } + +// it should "should fail fast if isWorthRetrying's effect fails" in new TestContext { +// val policy = RetryPolicies.limitRetries[StringOr](10000) + +// val finalResult = retryingOnFailuresAndSomeErrors[String]( +// policy, +// s => Right(s == "yay, but it doesn't matter"), +// (_: String) => Left("isWorthRetrying failed"): StringOr[Boolean], +// onError, +// onError +// ) { +// attempts = attempts + 1 +// Left("one more time") +// } + +// assert(finalResult == Left("isWorthRetrying failed")) +// assert(attempts == 1) +// assert(!gaveUp) +// } + +// behavior of "retryingOnFailuresAndAllErrors" + +// it should "retry until the action succeeds" in new TestContext { +// val policy = RetryPolicies.constantDelay[StringOr](1.second) + +// val finalResult = retryingOnFailuresAndAllErrors[String]( +// policy, +// s => Right(s == "yay"), +// onError, +// onError +// ) { +// attempts = attempts + 1 +// if (attempts < 3) +// Left("one more time") +// else +// Right("yay") +// } + +// assert(finalResult == Right("yay")) +// assert(attempts == 3) +// assert(errors.toList == List("one more time", "one more time")) +// assert(!gaveUp) +// } + +// it should "retry until the policy chooses to give up due to errors" in new TestContext { +// val policy = RetryPolicies.limitRetries[StringOr](2) + +// val finalResult = retryingOnFailuresAndAllErrors[String]( +// policy, +// s => Right(s == "will never happen"), +// onError, +// onError +// ) { +// attempts = attempts + 1 +// Left("one more time") +// } + +// assert(finalResult == Left("one more time")) +// assert(attempts == 3) +// assert( +// errors.toList == List("one more time", "one more time", "one more time") +// ) +// assert(gaveUp) +// } + +// it should "retry until the policy chooses to give up due to failures" in new TestContext { +// val policy = RetryPolicies.limitRetries[StringOr](2) + +// val finalResult = retryingOnFailuresAndAllErrors[String]( +// policy, +// s => Right(s == "yay"), +// onError, +// onError +// ) { +// attempts = attempts + 1 +// Right("boo") +// } + +// assert(finalResult == Right("boo")) +// assert(attempts == 3) +// assert(errors.toList == List("boo", "boo", "boo")) +// assert(gaveUp) +// } + +// it should "retry in a stack-safe way" in new TestContext { +// val policy = RetryPolicies.limitRetries[StringOr](10000) + +// val finalResult = retryingOnFailuresAndAllErrors[String]( +// policy, +// s => Right(s == "will never happen"), +// onError, +// onError +// ) { +// attempts = attempts + 1 +// Left("one more time") +// } + +// assert(finalResult == Left("one more time")) +// assert(attempts == 10001) +// assert(gaveUp) +// } + +// it should "should fail fast if wasSuccessful's effect fails" in new TestContext { +// val policy = RetryPolicies.limitRetries[StringOr](10000) + +// val finalResult = retryingOnFailuresAndAllErrors[String]( +// policy, +// _ => Left("an error was raised!"): StringOr[Boolean], +// onError, +// onError +// ) { +// attempts = attempts + 1 +// Right("one more time") +// } + +// assert(finalResult == Left("an error was raised!")) +// assert(attempts == 1) +// assert(!gaveUp) +// } + +// private class TestContext { +// var attempts = 0 +// val errors = ArrayBuffer.empty[String] +// val delays = ArrayBuffer.empty[FiniteDuration] +// var gaveUp = false + +// def onError(error: String, details: RetryDetails): Either[String, Unit] = { +// errors.append(error) +// details match { +// case RetryDetails.WillDelayAndRetry(delay, _, _) => delays.append(delay) +// case RetryDetails.GivingUp(_, _) => gaveUp = true +// } +// Right(()) +// } +// } +// } + +// import java.util.concurrent.TimeUnit + +// import retry.RetryPolicies._ +// import cats.{Id, catsInstancesForId} +// import org.scalacheck.{Arbitrary, Gen} +// import org.scalatest.flatspec.AnyFlatSpec +// import org.scalatestplus.scalacheck.Checkers +// import retry.PolicyDecision.{DelayAndRetry, GiveUp} + +// import scala.concurrent.duration._ + +// class RetryPoliciesSpec extends AnyFlatSpec with Checkers { +// override implicit val generatorDrivenConfig: PropertyCheckConfiguration = +// PropertyCheckConfiguration(minSuccessful = 100) + +// implicit val arbRetryStatus: Arbitrary[RetryStatus] = Arbitrary { +// for { +// a <- Gen.choose(0, 1000) +// b <- Gen.choose(0, 1000) +// c <- Gen.option(Gen.choose(b, 10000)) +// } yield RetryStatus( +// a, +// FiniteDuration(b, TimeUnit.MILLISECONDS), +// c.map(FiniteDuration(_, TimeUnit.MILLISECONDS)) +// ) +// } + +// val genFiniteDuration: Gen[FiniteDuration] = +// Gen.posNum[Long].map(FiniteDuration(_, TimeUnit.NANOSECONDS)) + +// case class LabelledRetryPolicy(policy: RetryPolicy[Id], description: String) { +// override def toString: String = description +// } + +// implicit val arbRetryPolicy: Arbitrary[LabelledRetryPolicy] = Arbitrary { +// Gen.oneOf( +// Gen.const(LabelledRetryPolicy(alwaysGiveUp[Id], "alwaysGiveUp")), +// genFiniteDuration.map(delay => +// LabelledRetryPolicy( +// constantDelay[Id](delay), +// s"constantDelay($delay)" +// ) +// ), +// genFiniteDuration.map(baseDelay => +// LabelledRetryPolicy( +// exponentialBackoff[Id](baseDelay), +// s"exponentialBackoff($baseDelay)" +// ) +// ), +// Gen +// .posNum[Int] +// .map(maxRetries => +// LabelledRetryPolicy( +// limitRetries(maxRetries), +// s"limitRetries($maxRetries)" +// ) +// ), +// genFiniteDuration.map(baseDelay => +// LabelledRetryPolicy( +// fibonacciBackoff[Id](baseDelay), +// s"fibonacciBackoff($baseDelay)" +// ) +// ), +// genFiniteDuration.map(baseDelay => +// LabelledRetryPolicy( +// fullJitter[Id](baseDelay), +// s"fullJitter($baseDelay)" +// ) +// ) +// ) +// } + +// behavior of "constantDelay" + +// it should "always retry with the same delay" in check { +// (status: RetryStatus) => +// constantDelay[Id](1.second) +// .decideNextRetry(status) == PolicyDecision.DelayAndRetry(1.second) +// } + +// behavior of "exponentialBackoff" + +// it should "start with the base delay and double the delay after each iteration" in { +// val policy = exponentialBackoff[Id](100.milliseconds) +// val arbitraryCumulativeDelay = 999.milliseconds +// val arbitraryPreviousDelay = Some(999.milliseconds) + +// def test(retriesSoFar: Int, expectedDelay: FiniteDuration) = { +// val status = RetryStatus( +// retriesSoFar, +// arbitraryCumulativeDelay, +// arbitraryPreviousDelay +// ) +// val verdict = policy.decideNextRetry(status) +// assert(verdict == PolicyDecision.DelayAndRetry(expectedDelay)) +// } + +// test(0, 100.milliseconds) +// test(1, 200.milliseconds) +// test(2, 400.milliseconds) +// test(3, 800.milliseconds) +// } + +// behavior of "fibonacciBackoff" + +// it should "start with the base delay and increase the delay in a Fibonacci-y way" in { +// val policy = fibonacciBackoff[Id](100.milliseconds) +// val arbitraryCumulativeDelay = 999.milliseconds +// val arbitraryPreviousDelay = Some(999.milliseconds) + +// def test(retriesSoFar: Int, expectedDelay: FiniteDuration) = { +// val status = RetryStatus( +// retriesSoFar, +// arbitraryCumulativeDelay, +// arbitraryPreviousDelay +// ) +// val verdict = policy.decideNextRetry(status) +// assert(verdict == PolicyDecision.DelayAndRetry(expectedDelay)) +// } + +// test(0, 100.milliseconds) +// test(1, 100.milliseconds) +// test(2, 200.milliseconds) +// test(3, 300.milliseconds) +// test(4, 500.milliseconds) +// test(5, 800.milliseconds) +// test(6, 1300.milliseconds) +// test(7, 2100.milliseconds) +// } + +// behavior of "fullJitter" + +// it should "implement the AWS Full Jitter backoff algorithm" in { +// val policy = fullJitter[Id](100.milliseconds) +// val arbitraryCumulativeDelay = 999.milliseconds +// val arbitraryPreviousDelay = Some(999.milliseconds) + +// def test(retriesSoFar: Int, expectedMaximumDelay: FiniteDuration): Unit = { +// val status = RetryStatus( +// retriesSoFar, +// arbitraryCumulativeDelay, +// arbitraryPreviousDelay +// ) +// for (_ <- 1 to 1000) { +// val verdict = policy.decideNextRetry(status) +// val delay = verdict.asInstanceOf[PolicyDecision.DelayAndRetry].delay +// assert(delay >= Duration.Zero) +// assert(delay < expectedMaximumDelay) +// } +// } + +// test(0, 100.milliseconds) +// test(1, 200.milliseconds) +// test(2, 400.milliseconds) +// test(3, 800.milliseconds) +// test(4, 1600.milliseconds) +// test(5, 3200.milliseconds) +// } + +// behavior of "all built-in policies" + +// it should "never try to create a FiniteDuration of more than Long.MaxValue nanoseconds" in check { +// (labelledPolicy: LabelledRetryPolicy, status: RetryStatus) => +// labelledPolicy.policy.decideNextRetry(status) match { +// case PolicyDecision.DelayAndRetry(nextDelay) => +// nextDelay.toNanos <= Long.MaxValue +// case PolicyDecision.GiveUp => true +// } +// } + +// behavior of "limitRetries" + +// it should "retry with no delay until the limit is reached" in check { +// (status: RetryStatus) => +// val limit = 500 +// val verdict = +// limitRetries[Id](limit).decideNextRetry(status) +// if (status.retriesSoFar < limit) { +// verdict == PolicyDecision.DelayAndRetry(Duration.Zero) +// } else { +// verdict == PolicyDecision.GiveUp +// } +// } + +// behavior of "capDelay" + +// it should "cap the delay" in { +// check { (status: RetryStatus) => +// capDelay(100.milliseconds, constantDelay[Id](101.milliseconds)) +// .decideNextRetry(status) == DelayAndRetry(100.milliseconds) +// } + +// check { (status: RetryStatus) => +// capDelay(100.milliseconds, constantDelay[Id](99.milliseconds)) +// .decideNextRetry(status) == DelayAndRetry(99.milliseconds) +// } +// } + +// behavior of "limitRetriesByDelay" + +// it should "give up if the underlying policy chooses a delay greater than the threshold" in { +// check { (status: RetryStatus) => +// limitRetriesByDelay(100.milliseconds, constantDelay[Id](101.milliseconds)) +// .decideNextRetry(status) == GiveUp +// } + +// check { (status: RetryStatus) => +// limitRetriesByDelay(100.milliseconds, constantDelay[Id](99.milliseconds)) +// .decideNextRetry(status) == DelayAndRetry(99.milliseconds) +// } +// } + +// behavior of "limitRetriesByCumulativeDelay" + +// it should "give up if cumulativeDelay + underlying policy's next delay >= threshold" in { +// val cumulativeDelay = 400.milliseconds +// val arbitraryRetriesSoFar = 5 +// val arbitraryPreviousDelay = Some(123.milliseconds) +// val status = RetryStatus( +// arbitraryRetriesSoFar, +// cumulativeDelay, +// arbitraryPreviousDelay +// ) + +// val threshold = 500.milliseconds + +// def test( +// underlyingPolicy: RetryPolicy[Id], +// expectedDecision: PolicyDecision +// ) = { +// val policy = limitRetriesByCumulativeDelay(threshold, underlyingPolicy) +// assert(policy.decideNextRetry(status) == expectedDecision) +// } + +// test(constantDelay(98.milliseconds), DelayAndRetry(98.milliseconds)) +// test(constantDelay(99.milliseconds), DelayAndRetry(99.milliseconds)) +// test(constantDelay(100.milliseconds), GiveUp) +// test(constantDelay(101.milliseconds), GiveUp) +// } +// } + +// import cats.{Id, catsInstancesForId} +// import cats.syntax.semigroup._ +// import org.scalatest.flatspec.AnyFlatSpec + +// import scala.concurrent.duration._ + +// class RetryPolicySpec extends AnyFlatSpec { +// behavior of "BoundedSemilattice append" + +// it should "give up if either of the composed policies decides to give up" in { +// val alwaysGiveUp = RetryPolicy.lift[Id](_ => PolicyDecision.GiveUp) +// val alwaysRetry = RetryPolicies.constantDelay[Id](1.second) + +// assert( +// (alwaysGiveUp |+| alwaysRetry) +// .decideNextRetry(RetryStatus.NoRetriesYet) == PolicyDecision.GiveUp +// ) +// assert( +// (alwaysRetry |+| alwaysGiveUp) +// .decideNextRetry(RetryStatus.NoRetriesYet) == PolicyDecision.GiveUp +// ) +// } + +// it should "choose the maximum of the delays if both of the composed policies decides to retry" in { +// val delayOneSecond = +// RetryPolicy.lift[Id](_ => PolicyDecision.DelayAndRetry(1.second)) +// val delayTwoSeconds = +// RetryPolicy.lift[Id](_ => PolicyDecision.DelayAndRetry(2.seconds)) + +// assert( +// (delayOneSecond |+| delayTwoSeconds).decideNextRetry( +// RetryStatus.NoRetriesYet +// ) == PolicyDecision.DelayAndRetry(2.seconds) +// ) +// assert( +// (delayTwoSeconds |+| delayOneSecond).decideNextRetry( +// RetryStatus.NoRetriesYet +// ) == PolicyDecision.DelayAndRetry(2.seconds) +// ) +// } +// }