From 250a3ac2b27ac5c5a44e53bf1cbb1e7c1163c23a Mon Sep 17 00:00:00 2001 From: Jongbeom Kim Date: Thu, 21 Sep 2023 14:55:56 +0900 Subject: [PATCH 1/6] Add rollbackIfFailed parameter in refresh --- .../src/main/scala/zio/cache/Cache.scala | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/zio-cache/shared/src/main/scala/zio/cache/Cache.scala b/zio-cache/shared/src/main/scala/zio/cache/Cache.scala index da44d61..5d86f00 100644 --- a/zio-cache/shared/src/main/scala/zio/cache/Cache.scala +++ b/zio-cache/shared/src/main/scala/zio/cache/Cache.scala @@ -76,7 +76,7 @@ abstract class Cache[-Key, +Error, +Value] { * by the lookup function. Additionally, `refresh` always triggers the * lookup function, disregarding the last `Error`. */ - def refresh(key: Key): IO[Error, Unit] + def refresh(key: Key, rollbackIfFailed: Boolean = false): IO[Error, Unit] /** * Invalidates the value associated with the specified key. @@ -247,7 +247,7 @@ object Cache { } } - override def refresh(in: In): IO[Error, Unit] = + override def refresh(in: In, rollbackIfFailed: Boolean = false): IO[Error, Unit] = ZIO.suspendSucceedUnsafe { implicit u => val k = keyBy(in) val promise = newPromise() @@ -267,7 +267,9 @@ object Cache { get(in) } else { // Only trigger the lookup if we're still the current value, `completedResult` - lookupValueOf(in, promise).when { + val rollbackValueIfFailed: Option[MapValue.Complete[Key, Error, Value]] = + if (rollbackIfFailed) Some(completedResult) else None + lookupValueOf(in, promise, rollbackValueIfFailed).when { map.replace(k, completedResult, MapValue.Refreshing(promise, completedResult)) } } @@ -292,7 +294,11 @@ object Cache { def size(implicit trace: Trace): UIO[Int] = ZIO.succeed(map.size) - private def lookupValueOf(in: In, promise: Promise[Error, Value]): IO[Error, Value] = + private def lookupValueOf( + in: In, + promise: Promise[Error, Value], + rollbackValueIfFailed: Option[MapValue.Complete[Key, Error, Value]] = None + ): IO[Error, Value] = ZIO.suspendSucceed { val key = keyBy(in) lookup(in) @@ -302,8 +308,14 @@ object Cache { val now = Unsafe.unsafe(implicit u => clock.unsafe.instant()) val entryStats = EntryStats(now) - map.put(key, MapValue.Complete(new MapKey(key), exit, entryStats, now.plus(timeToLive(exit)))) - promise.done(exit) *> ZIO.done(exit) + rollbackValueIfFailed match { + case Some(rollbackValue) if exit.isFailure => + map.put(key, rollbackValue) + promise.done(rollbackValue.exit) *> ZIO.done(exit) + case _ => + map.put(key, MapValue.Complete(new MapKey(key), exit, entryStats, now.plus(timeToLive(exit)))) + promise.done(exit) *> ZIO.done(exit) + } } .onInterrupt(promise.interrupt *> ZIO.succeed(map.remove(key))) } From 25c393322767f0e2088663edd0a7f77e13dd185b Mon Sep 17 00:00:00 2001 From: Jongbeom Kim Date: Fri, 22 Sep 2023 10:16:56 +0900 Subject: [PATCH 2/6] Implement another operator `refreshValue` --- .../src/main/scala/zio/cache/Cache.scala | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/zio-cache/shared/src/main/scala/zio/cache/Cache.scala b/zio-cache/shared/src/main/scala/zio/cache/Cache.scala index 5d86f00..2d83975 100644 --- a/zio-cache/shared/src/main/scala/zio/cache/Cache.scala +++ b/zio-cache/shared/src/main/scala/zio/cache/Cache.scala @@ -76,7 +76,13 @@ abstract class Cache[-Key, +Error, +Value] { * by the lookup function. Additionally, `refresh` always triggers the * lookup function, disregarding the last `Error`. */ - def refresh(key: Key, rollbackIfFailed: Boolean = false): IO[Error, Unit] + def refresh(key: Key): IO[Error, Unit] + + /** + * Computes the value associated with the specified key, with the lookup + * function, and puts it in the cache only if it is a value, not an error. + */ + def refreshValue(key: Key): IO[Error, Unit] /** * Invalidates the value associated with the specified key. @@ -247,7 +253,13 @@ object Cache { } } - override def refresh(in: In, rollbackIfFailed: Boolean = false): IO[Error, Unit] = + def refresh(in: In): IO[Error, Unit] = + refresh(in, rollbackIfError = false) + + def refreshValue(in: In): IO[Error, Unit] = + refresh(in, rollbackIfError = true) + + private def refresh(in: In, rollbackIfError: Boolean): IO[Error, Unit] = ZIO.suspendSucceedUnsafe { implicit u => val k = keyBy(in) val promise = newPromise() @@ -267,9 +279,9 @@ object Cache { get(in) } else { // Only trigger the lookup if we're still the current value, `completedResult` - val rollbackValueIfFailed: Option[MapValue.Complete[Key, Error, Value]] = - if (rollbackIfFailed) Some(completedResult) else None - lookupValueOf(in, promise, rollbackValueIfFailed).when { + val rollbackResultIfError: Option[MapValue.Complete[Key, Error, Value]] = + if (rollbackIfError) Some(completedResult) else None + lookupValueOf(in, promise, rollbackResultIfError).when { map.replace(k, completedResult, MapValue.Refreshing(promise, completedResult)) } } @@ -297,7 +309,7 @@ object Cache { private def lookupValueOf( in: In, promise: Promise[Error, Value], - rollbackValueIfFailed: Option[MapValue.Complete[Key, Error, Value]] = None + rollbackResultIfError: Option[MapValue.Complete[Key, Error, Value]] = None ): IO[Error, Value] = ZIO.suspendSucceed { val key = keyBy(in) @@ -308,10 +320,10 @@ object Cache { val now = Unsafe.unsafe(implicit u => clock.unsafe.instant()) val entryStats = EntryStats(now) - rollbackValueIfFailed match { - case Some(rollbackValue) if exit.isFailure => - map.put(key, rollbackValue) - promise.done(rollbackValue.exit) *> ZIO.done(exit) + rollbackResultIfError match { + case Some(rollbackResult) if exit.isFailure => + map.put(key, rollbackResult) + promise.done(rollbackResult.exit) *> ZIO.done(exit) case _ => map.put(key, MapValue.Complete(new MapKey(key), exit, entryStats, now.plus(timeToLive(exit)))) promise.done(exit) *> ZIO.done(exit) From 481871e9a4c0cdc721ae5cb3ccaec19db8b6c474 Mon Sep 17 00:00:00 2001 From: Jongbeom Kim Date: Fri, 22 Sep 2023 10:33:54 +0900 Subject: [PATCH 3/6] Return exit on promise --- zio-cache/shared/src/main/scala/zio/cache/Cache.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-cache/shared/src/main/scala/zio/cache/Cache.scala b/zio-cache/shared/src/main/scala/zio/cache/Cache.scala index 2d83975..0b3b734 100644 --- a/zio-cache/shared/src/main/scala/zio/cache/Cache.scala +++ b/zio-cache/shared/src/main/scala/zio/cache/Cache.scala @@ -323,11 +323,11 @@ object Cache { rollbackResultIfError match { case Some(rollbackResult) if exit.isFailure => map.put(key, rollbackResult) - promise.done(rollbackResult.exit) *> ZIO.done(exit) case _ => map.put(key, MapValue.Complete(new MapKey(key), exit, entryStats, now.plus(timeToLive(exit)))) - promise.done(exit) *> ZIO.done(exit) } + + promise.done(exit) *> ZIO.done(exit) } .onInterrupt(promise.interrupt *> ZIO.succeed(map.remove(key))) } From 3df9381a179f820aada5980c8b0e211ac3742302 Mon Sep 17 00:00:00 2001 From: Jongbeom Kim Date: Fri, 22 Sep 2023 11:11:18 +0900 Subject: [PATCH 4/6] Implement test about `refreshValue` --- .../src/test/scala/zio/cache/CacheSpec.scala | 66 ++++++++++++++++++- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/zio-cache/shared/src/test/scala/zio/cache/CacheSpec.scala b/zio-cache/shared/src/test/scala/zio/cache/CacheSpec.scala index 3d2cf46..6d95891 100644 --- a/zio-cache/shared/src/test/scala/zio/cache/CacheSpec.scala +++ b/zio-cache/shared/src/test/scala/zio/cache/CacheSpec.scala @@ -114,13 +114,17 @@ object CacheSpec extends ZIOSpecDefault { _ <- cache.refresh(key) val1 <- cache.get(key).either _ <- cache.refresh(key) + val2 <- cache.get(key).either failure2 <- cache.refresh(key).either + failure3 <- cache.get(key).either _ <- cache.refresh(key) - val2 <- cache.get(key).either + val3 <- cache.get(key).either } yield assert(failure1)(isLeft(equalTo(error))) && assert(failure2)(isLeft(equalTo(error))) && + assert(failure3)(isLeft(equalTo(error))) && assert(val1)(isRight(equalTo(4))) && - assert(val2)(isRight(equalTo(7))) + assert(val2)(isRight(equalTo(5))) && + assert(val3)(isRight(equalTo(7))) }, test("should get the value if the key doesn't exist in the cache") { check(Gen.int) { salt => @@ -134,6 +138,64 @@ object CacheSpec extends ZIOSpecDefault { } } ), + suite("`refreshValue` method")( + test("should update the cache with a new value") { + def inc(n: Int) = n * 10 + + def retrieve(multiplier: Ref[Int])(key: Int) = + multiplier + .updateAndGet(inc) + .map(key * _) + + val seed = 1 + val key = 123 + for { + ref <- Ref.make(seed) + cache <- Cache.make(1, Duration.Infinity, Lookup(retrieve(ref))) + val1 <- cache.get(key) + _ <- cache.refreshValue(key) + _ <- cache.get(key) + val2 <- cache.get(key) + } yield assertTrue(val1 == inc(key)) && assertTrue(val2 == inc(val1)) + }, + test("should update the cache only if lookup return a value, not an error.") { + + val error = new RuntimeException("Must be a multiple of 3") + + def inc(n: Int) = n + 1 + + def retrieve(number: Ref[Int])(key: Int) = + number + .updateAndGet(inc) + .flatMap { + case n if n % 3 == 0 => + ZIO.fail(error) + case n => + ZIO.succeed(key * n) + } + + val seed = 2 + val key = 1 + for { + ref <- Ref.make(seed) + cache <- Cache.make(1, Duration.Infinity, Lookup(retrieve(ref))) + failure1 <- cache.get(key).either + _ <- cache.refreshValue(key) + val1 <- cache.get(key).either + _ <- cache.refreshValue(key) + val2 <- cache.get(key).either + failure2 <- cache.refreshValue(key).either + val3 <- cache.get(key).either + _ <- cache.refreshValue(key) + val4 <- cache.get(key).either + } yield assert(failure1)(isLeft(equalTo(error))) && + assert(failure2)(isLeft(equalTo(error))) && + assert(val1)(isRight(equalTo(4))) && + assert(val2)(isRight(equalTo(5))) && + assert(val3)(isRight(equalTo(5))) && + assert(val4)(isRight(equalTo(7))) + } + ), test("size") { check(Gen.int) { salt => for { From 5da82d1ae7e9f701cab11c237c8bbf6dc7678edf Mon Sep 17 00:00:00 2001 From: Jongbeom Kim Date: Fri, 22 Sep 2023 11:36:58 +0900 Subject: [PATCH 5/6] Add case for rollback to empty cache --- .../src/main/scala/zio/cache/Cache.scala | 30 ++++++++++++------- .../src/test/scala/zio/cache/CacheSpec.scala | 27 +++++++++++++++++ 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/zio-cache/shared/src/main/scala/zio/cache/Cache.scala b/zio-cache/shared/src/main/scala/zio/cache/Cache.scala index 0b3b734..1d8cf27 100644 --- a/zio-cache/shared/src/main/scala/zio/cache/Cache.scala +++ b/zio-cache/shared/src/main/scala/zio/cache/Cache.scala @@ -268,7 +268,8 @@ object Cache { value = map.putIfAbsent(k, MapValue.Pending(new MapKey(k), promise)) } val result = if (value eq null) { - lookupValueOf(in, promise) + val rollbackResultIfError = if (rollbackIfError) Right(None) else Left(()) + lookupValueOf(in, promise, rollbackResultIfError) } else { value match { case MapValue.Pending(_, promiseInProgress) => @@ -279,8 +280,7 @@ object Cache { get(in) } else { // Only trigger the lookup if we're still the current value, `completedResult` - val rollbackResultIfError: Option[MapValue.Complete[Key, Error, Value]] = - if (rollbackIfError) Some(completedResult) else None + val rollbackResultIfError = if (rollbackIfError) Right(Some(completedResult)) else Left(()) lookupValueOf(in, promise, rollbackResultIfError).when { map.replace(k, completedResult, MapValue.Refreshing(promise, completedResult)) } @@ -309,7 +309,12 @@ object Cache { private def lookupValueOf( in: In, promise: Promise[Error, Value], - rollbackResultIfError: Option[MapValue.Complete[Key, Error, Value]] = None + /** + * Left(()): Put the lookup result. + * Right(None): Remove key if there is a error. + * Right(Some(rollbackResult)): Rollback if there is a error. + */ + rollbackResultIfError: Either[Unit, Option[MapValue.Complete[Key, Error, Value]]] = Left(()) ): IO[Error, Value] = ZIO.suspendSucceed { val key = keyBy(in) @@ -320,12 +325,17 @@ object Cache { val now = Unsafe.unsafe(implicit u => clock.unsafe.instant()) val entryStats = EntryStats(now) - rollbackResultIfError match { - case Some(rollbackResult) if exit.isFailure => - map.put(key, rollbackResult) - case _ => - map.put(key, MapValue.Complete(new MapKey(key), exit, entryStats, now.plus(timeToLive(exit)))) - } + if (exit.isSuccess) + map.put(key, MapValue.Complete(new MapKey(key), exit, entryStats, now.plus(timeToLive(exit)))) + else + rollbackResultIfError match { + case Left(()) => + map.put(key, MapValue.Complete(new MapKey(key), exit, entryStats, now.plus(timeToLive(exit)))) + case Right(None) => + map.remove(key) + case Right(Some(rollbackResult)) => + map.put(key, rollbackResult) + } promise.done(exit) *> ZIO.done(exit) } diff --git a/zio-cache/shared/src/test/scala/zio/cache/CacheSpec.scala b/zio-cache/shared/src/test/scala/zio/cache/CacheSpec.scala index 6d95891..3dd293d 100644 --- a/zio-cache/shared/src/test/scala/zio/cache/CacheSpec.scala +++ b/zio-cache/shared/src/test/scala/zio/cache/CacheSpec.scala @@ -194,6 +194,33 @@ object CacheSpec extends ZIOSpecDefault { assert(val2)(isRight(equalTo(5))) && assert(val3)(isRight(equalTo(5))) && assert(val4)(isRight(equalTo(7))) + }, + test("should update only if it is a value when the key doesn't exist in the cache") { + + val error = new RuntimeException("Must be a multiple of 3") + + def inc(n: Int) = n + 1 + + def retrieve(number: Ref[Int])(key: Int) = + number + .updateAndGet(inc) + .flatMap { + case n if n % 3 == 0 => + ZIO.fail(error) + case n => + ZIO.succeed(key * n) + } + + val seed = 2 + val key = 1 + val cap = 30 + for { + ref <- Ref.make(seed) + cache <- Cache.make(cap, Duration.Infinity, Lookup(retrieve(ref))) + count0 <- cache.size + _ <- ZIO.foreachDiscard(1 to cap)(key => cache.refreshValue(key).either) + count1 <- cache.size + } yield assertTrue(count0 == 0) && assertTrue(count1 == cap / 3 * 2) } ), test("size") { From bc0094197a3dac61f855544d5de1aa044b6ba630 Mon Sep 17 00:00:00 2001 From: Jongbeom Kim Date: Fri, 22 Sep 2023 12:01:51 +0900 Subject: [PATCH 6/6] Fix typo --- zio-cache/shared/src/main/scala/zio/cache/Cache.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-cache/shared/src/main/scala/zio/cache/Cache.scala b/zio-cache/shared/src/main/scala/zio/cache/Cache.scala index 1d8cf27..3f4d52d 100644 --- a/zio-cache/shared/src/main/scala/zio/cache/Cache.scala +++ b/zio-cache/shared/src/main/scala/zio/cache/Cache.scala @@ -311,8 +311,8 @@ object Cache { promise: Promise[Error, Value], /** * Left(()): Put the lookup result. - * Right(None): Remove key if there is a error. - * Right(Some(rollbackResult)): Rollback if there is a error. + * Right(None): Remove key if there is an error. + * Right(Some(rollbackResult)): Rollback if there is an error. */ rollbackResultIfError: Either[Unit, Option[MapValue.Complete[Key, Error, Value]]] = Left(()) ): IO[Error, Value] =