Skip to content

Commit

Permalink
Ensure redis-cache does not cause load value on the wrong executor
Browse files Browse the repository at this point in the history
  • Loading branch information
geoand committed Mar 30, 2023
1 parent f15a82f commit f954e5f
Show file tree
Hide file tree
Showing 8 changed files with 353 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
import io.quarkus.cache.runtime.AbstractCache;
import io.quarkus.redis.client.RedisClientName;
import io.quarkus.redis.runtime.datasource.Marshaller;
import io.quarkus.runtime.BlockingOperationControl;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.smallrye.mutiny.unchecked.UncheckedFunction;
import io.smallrye.mutiny.vertx.MutinyHelper;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.redis.client.Command;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisConnection;
Expand All @@ -41,6 +44,7 @@ public class RedisCacheImpl<K, V> extends AbstractCache implements RedisCache {
"double", Double.class,
"boolean", Boolean.class);

private final Vertx vertx;
private final Redis redis;

private final RedisCacheInfo cacheInfo;
Expand All @@ -49,9 +53,12 @@ public class RedisCacheImpl<K, V> extends AbstractCache implements RedisCache {

private final Marshaller marshaller;

private final Supplier<Boolean> blockingAllowedSupplier;

public RedisCacheImpl(RedisCacheInfo cacheInfo, Optional<String> redisClientName) {

this(cacheInfo, determineRedisClient(redisClientName));
this(cacheInfo, Arc.container().select(Vertx.class).get(), determineRedisClient(redisClientName),
BlockingOperationControl::isBlockingAllowed);
}

private static Redis determineRedisClient(Optional<String> redisClientName) {
Expand All @@ -64,8 +71,10 @@ private static Redis determineRedisClient(Optional<String> redisClientName) {
}

@SuppressWarnings("unchecked")
public RedisCacheImpl(RedisCacheInfo cacheInfo, Redis redis) {
public RedisCacheImpl(RedisCacheInfo cacheInfo, Vertx vertx, Redis redis, Supplier<Boolean> blockingAllowedSupplier) {
this.vertx = vertx;
this.cacheInfo = cacheInfo;
this.blockingAllowedSupplier = blockingAllowedSupplier;

try {
this.classOfKey = (Class<K>) loadClass(this.cacheInfo.keyType);
Expand Down Expand Up @@ -127,6 +136,7 @@ public <K, V> Uni<V> get(K key, Class<V> clazz, Function<K, V> valueLoader) {
// val = deserialize(GET K)
// if (val == null) => SET K computation.apply(K)
byte[] encodedKey = marshaller.encode(computeActualKey(encodeKey(key)));
boolean isWorkerThread = blockingAllowedSupplier.get();
return withConnection(new Function<RedisConnection, Uni<V>>() {
@Override
public Uni<V> apply(RedisConnection connection) {
Expand All @@ -138,17 +148,39 @@ public Uni<V> apply(V cached) throws Exception {
if (cached != null) {
return Uni.createFrom().item(new StaticSupplier<>(cached));
} else {
V value = valueLoader.apply(key);
if (value == null) {
throw new IllegalArgumentException("Cannot cache `null` value");
}
byte[] encodedValue = marshaller.encode(value);
if (cacheInfo.useOptimisticLocking) {
return multi(connection, set(connection, encodedKey, encodedValue))
.replaceWith(value);
Uni<V> uni;
if (isWorkerThread) {
uni = Uni.createFrom().item(new Supplier<V>() {
@Override
public V get() {
return valueLoader.apply(key);
}
}).runSubscriptionOn(MutinyHelper.blockingExecutor(vertx.getDelegate()));
} else {
return set(connection, encodedKey, encodedValue).replaceWith(value);
uni = Uni.createFrom().item(valueLoader.apply(key));
}

return uni.onItem().call(new Function<V, Uni<?>>() {
@Override
public Uni<?> apply(V value) {
if (value == null) {
throw new IllegalArgumentException("Cannot cache `null` value");
}
byte[] encodedValue = marshaller.encode(value);
Uni<V> result;
if (cacheInfo.useOptimisticLocking) {
result = multi(connection, set(connection, encodedKey, encodedValue))
.replaceWith(value);
} else {
result = set(connection, encodedKey, encodedValue).replaceWith(value);
}
if (isWorkerThread) {
return result
.runSubscriptionOn(MutinyHelper.blockingExecutor(vertx.getDelegate()));
}
return result;
}
});
}
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.time.Duration;
import java.util.*;
import java.util.function.Supplier;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
Expand All @@ -20,6 +21,8 @@

class RedisCacheImplTest extends RedisCacheTestBase {

private static final Supplier<Boolean> BLOCKING_ALLOWED = () -> false;

@AfterEach
void clear() {
redis.send(Request.cmd(Command.FLUSHALL).arg("SYNC")).await().atMost(Duration.ofSeconds(10));
Expand All @@ -32,7 +35,7 @@ public void testPutInTheCache() {
info.name = "foo";
info.valueType = String.class.getName();
info.ttl = Optional.of(Duration.ofSeconds(2));
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);
assertThat(cache.get(k, s -> "hello").await().indefinitely()).isEqualTo("hello");
var r = redis.send(Request.cmd(Command.GET).arg("cache:foo:" + k)).await().indefinitely();
assertThat(r).isNotNull();
Expand All @@ -46,7 +49,7 @@ public void testPutInTheCacheWithOptimisitcLocking() {
info.valueType = String.class.getName();
info.ttl = Optional.of(Duration.ofSeconds(2));
info.useOptimisticLocking = true;
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);
assertThat(cache.get(k, s -> "hello").await().indefinitely()).isEqualTo("hello");
var r = redis.send(Request.cmd(Command.GET).arg("cache:foo:" + k)).await().indefinitely();
assertThat(r).isNotNull();
Expand All @@ -58,7 +61,7 @@ public void testPutAndWaitForInvalidation() {
RedisCacheInfo info = new RedisCacheInfo();
info.valueType = String.class.getName();
info.ttl = Optional.of(Duration.ofSeconds(1));
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);
assertThat(cache.get(k, s -> "hello").await().indefinitely()).isEqualTo("hello");
var x = cache.get(k, String::toUpperCase).await().indefinitely();
assertEquals(x, "hello");
Expand All @@ -70,7 +73,7 @@ public void testManualInvalidation() {
RedisCacheInfo info = new RedisCacheInfo();
info.valueType = String.class.getName();
info.ttl = Optional.of(Duration.ofSeconds(10));
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);
cache.get("foo", s -> "hello").await().indefinitely();
var x = cache.get("foo", String::toUpperCase).await().indefinitely();
assertEquals(x, "hello");
Expand Down Expand Up @@ -113,7 +116,7 @@ public void testGetOrNull() {
RedisCacheInfo info = new RedisCacheInfo();
info.ttl = Optional.of(Duration.ofSeconds(10));
info.valueType = Person.class.getName();
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);
Person person = cache.getOrNull("foo", Person.class).await().indefinitely();
assertThat(person).isNull();
assertThatTheKeyDoesNotExist("cache:foo");
Expand All @@ -133,7 +136,7 @@ public void testGetOrDefault() {
RedisCacheInfo info = new RedisCacheInfo();
info.ttl = Optional.of(Duration.ofSeconds(10));
info.valueType = Person.class.getName();
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);
Person person = cache.getOrDefault("foo", new Person("bar", "BAR")).await().indefinitely();
assertThat(person).isNotNull()
.satisfies(p -> {
Expand All @@ -159,7 +162,7 @@ public void testCacheNullValue() {
RedisCacheInfo info = new RedisCacheInfo();
info.ttl = Optional.of(Duration.ofSeconds(10));
info.valueType = Person.class.getName();
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);

// with custom key
double key = 122334545.0;
Expand All @@ -175,7 +178,7 @@ public void testExceptionInValueLoader() {
info.valueType = Person.class.getName();
info.keyType = Double.class.getName();
info.name = "foo";
RedisCacheImpl<Double, Person> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<Double, Person> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);

// with custom key and exception
Double key = 122334545.0;
Expand All @@ -197,7 +200,7 @@ public void testPutShouldPopulateCache() {
info.ttl = Optional.of(Duration.ofSeconds(10));
info.valueType = Person.class.getName();
info.keyType = Integer.class.getName();
RedisCacheImpl<Integer, Person> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<Integer, Person> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);

cache.put(1, new Person("luke", "skywalker")).await().indefinitely();
assertThat(cache.get(1, x -> new Person("1", "1")).await().indefinitely()).isEqualTo(new Person("luke", "skywalker"));
Expand All @@ -214,7 +217,7 @@ public void testPutShouldPopulateCacheWithOptimisticLocking() {
info.valueType = Person.class.getName();
info.keyType = Integer.class.getName();
info.useOptimisticLocking = true;
RedisCacheImpl<Integer, Person> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<Integer, Person> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);

cache.put(1, new Person("luke", "skywalker")).await().indefinitely();
assertThat(cache.get(1, x -> new Person("1", "1")).await().indefinitely()).isEqualTo(new Person("luke", "skywalker"));
Expand All @@ -231,7 +234,7 @@ public void testThatConnectionsAreRecycled() {
info.name = "foo";
info.valueType = String.class.getName();
info.ttl = Optional.of(Duration.ofSeconds(1));
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);

for (int i = 0; i < 1000; i++) {
String val = "hello-" + i;
Expand All @@ -257,7 +260,7 @@ public void testThatConnectionsAreRecycledWithOptimisticLocking() {
info.valueType = String.class.getName();
info.ttl = Optional.of(Duration.ofSeconds(1));
info.useOptimisticLocking = true;
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);

for (int i = 0; i < 1000; i++) {
String val = "hello-" + i;
Expand All @@ -280,7 +283,7 @@ void testWithMissingDefaultType() {
RedisCacheInfo info = new RedisCacheInfo();
info.name = "missing-default-cache";
info.ttl = Optional.of(Duration.ofSeconds(10));
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);

assertThatThrownBy(() -> cache.get("test", x -> "value").await().indefinitely())
.isInstanceOf(UnsupportedOperationException.class);
Expand All @@ -303,7 +306,7 @@ void testAsyncGetWithDefaultType() {
info.name = "star-wars";
info.ttl = Optional.of(Duration.ofSeconds(2));
info.valueType = Person.class.getName();
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);

assertThat(cache
.getAsync("test",
Expand Down Expand Up @@ -334,7 +337,7 @@ void testAsyncGetWithDefaultTypeWithOptimisticLocking() {
info.ttl = Optional.of(Duration.ofSeconds(2));
info.valueType = Person.class.getName();
info.useOptimisticLocking = true;
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);

assertThat(cache
.getAsync("test",
Expand Down Expand Up @@ -364,7 +367,7 @@ void testPut() {
info.name = "put";
info.ttl = Optional.of(Duration.ofSeconds(2));
info.valueType = Person.class.getName();
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);

Person luke = new Person("luke", "skywalker");
Person leia = new Person("leia", "organa");
Expand All @@ -383,7 +386,7 @@ void testPutWithSupplier() {
info.name = "put";
info.ttl = Optional.of(Duration.ofSeconds(2));
info.valueType = Person.class.getName();
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, Person> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);

Person luke = new Person("luke", "skywalker");
Person leia = new Person("leia", "organa");
Expand All @@ -403,7 +406,7 @@ void testInitializationWithAnUnknownClass() {
info.ttl = Optional.of(Duration.ofSeconds(2));
info.valueType = Person.class.getPackage().getName() + ".Missing";

assertThatThrownBy(() -> new RedisCacheImpl<>(info, redis))
assertThatThrownBy(() -> new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED))
.isInstanceOf(IllegalArgumentException.class);
}

Expand All @@ -413,7 +416,7 @@ void testGetDefaultKey() {
info.name = "test-default-key";
info.ttl = Optional.of(Duration.ofSeconds(2));

RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);
assertThat(cache.getDefaultKey()).isEqualTo("default-cache-key");

assertThat(cache.getDefaultValueType()).isNull();
Expand All @@ -425,7 +428,7 @@ void testInvalidation() {
info.name = "test-invalidation";
info.ttl = Optional.of(Duration.ofSeconds(10));

RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, redis);
RedisCacheImpl<String, String> cache = new RedisCacheImpl<>(info, vertx, redis, BLOCKING_ALLOWED);

redis.send(Request.cmd(Command.SET).arg("key6").arg("my-value")).await().indefinitely();

Expand Down
17 changes: 17 additions & 0 deletions integration-tests/redis-cache/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-redis-cache</artifactId>
Expand Down Expand Up @@ -64,6 +68,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client-reactive-jackson-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Loading

0 comments on commit f954e5f

Please sign in to comment.