Skip to content

Commit

Permalink
Merge pull request #159 from graphql-java/reactive-streams-branch-mov…
Browse files Browse the repository at this point in the history
…e-reactive-classes-out-of-dataloader-helper

Reactive streams branch move reactive classes out of dataloader helper
  • Loading branch information
bbakerman authored May 24, 2024
2 parents e9bfc2b + 170ccf8 commit 77fd0dd
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 241 deletions.
256 changes: 15 additions & 241 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import org.dataloader.annotations.GuardedBy;
import org.dataloader.annotations.Internal;
import org.dataloader.impl.CompletableFutureKit;
import org.dataloader.impl.DataLoaderAssertionException;
import org.dataloader.reactive.ReactiveSupport;
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.StatisticsCollector;
import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext;
Expand All @@ -12,13 +12,11 @@
import org.dataloader.stats.context.IncrementLoadCountStatisticsContext;
import org.dataloader.stats.context.IncrementLoadErrorCountStatisticsContext;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -510,7 +508,7 @@ private CompletableFuture<List<V>> invokeMapBatchLoader(List<K> keys, BatchLoade

private CompletableFuture<List<V>> invokeBatchPublisher(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures, BatchLoaderEnvironment environment) {
CompletableFuture<List<V>> loadResult = new CompletableFuture<>();
Subscriber<V> subscriber = new DataLoaderSubscriber(loadResult, keys, keyContexts, queuedFutures);
Subscriber<V> subscriber = ReactiveSupport.batchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration());

BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
if (batchLoadFunction instanceof BatchPublisherWithContext) {
Expand All @@ -537,7 +535,7 @@ private CompletableFuture<List<V>> invokeBatchPublisher(List<K> keys, List<Objec

private CompletableFuture<List<V>> invokeMappedBatchPublisher(List<K> keys, List<Object> keyContexts, List<CompletableFuture<V>> queuedFutures, BatchLoaderEnvironment environment) {
CompletableFuture<List<V>> loadResult = new CompletableFuture<>();
Subscriber<Map.Entry<K, V>> subscriber = new DataLoaderMapEntrySubscriber(loadResult, keys, keyContexts, queuedFutures);
Subscriber<Map.Entry<K, V>> subscriber = ReactiveSupport.mappedBatchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration());

BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
if (batchLoadFunction instanceof MappedBatchPublisherWithContext) {
Expand Down Expand Up @@ -625,246 +623,22 @@ private static <T> DispatchResult<T> emptyDispatchResult() {
return (DispatchResult<T>) EMPTY_DISPATCH_RESULT;
}

/**********************************************************************************************
* ********************************************************************************************
* <p>
* The reactive support classes start here
*
* @param <T> for two
**********************************************************************************************
**********************************************************************************************
*/
private abstract class DataLoaderSubscriberBase<T> implements Subscriber<T> {

final CompletableFuture<List<V>> valuesFuture;
final List<K> keys;
final List<Object> callContexts;
final List<CompletableFuture<V>> queuedFutures;

List<K> clearCacheKeys = new ArrayList<>();
List<V> completedValues = new ArrayList<>();
boolean onErrorCalled = false;
boolean onCompleteCalled = false;

DataLoaderSubscriberBase(
CompletableFuture<List<V>> valuesFuture,
List<K> keys,
List<Object> callContexts,
List<CompletableFuture<V>> queuedFutures
) {
this.valuesFuture = valuesFuture;
this.keys = keys;
this.callContexts = callContexts;
this.queuedFutures = queuedFutures;
}

@Override
public void onSubscribe(Subscription subscription) {
subscription.request(keys.size());
}

@Override
public void onNext(T v) {
assertState(!onErrorCalled, () -> "onError has already been called; onNext may not be invoked.");
assertState(!onCompleteCalled, () -> "onComplete has already been called; onNext may not be invoked.");
}

@Override
public void onComplete() {
assertState(!onErrorCalled, () -> "onError has already been called; onComplete may not be invoked.");
onCompleteCalled = true;
}

@Override
public void onError(Throwable throwable) {
assertState(!onCompleteCalled, () -> "onComplete has already been called; onError may not be invoked.");
onErrorCalled = true;

stats.incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts));
}

/*
* A value has arrived - how do we complete the future that's associated with it in a common way
*/
void onNextValue(K key, V value, Object callContext, List<CompletableFuture<V>> futures) {
if (value instanceof Try) {
// we allow the batch loader to return a Try so we can better represent a computation
// that might have worked or not.
//noinspection unchecked
Try<V> tryValue = (Try<V>) value;
if (tryValue.isSuccess()) {
futures.forEach(f -> f.complete(tryValue.get()));
} else {
stats.incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext));
futures.forEach(f -> f.completeExceptionally(tryValue.getThrowable()));
clearCacheKeys.add(key);
}
} else {
futures.forEach(f -> f.complete(value));
private ReactiveSupport.HelperIntegration<K> helperIntegration() {
return new ReactiveSupport.HelperIntegration<>() {
@Override
public StatisticsCollector getStats() {
return stats;
}
}

Throwable unwrapThrowable(Throwable ex) {
if (ex instanceof CompletionException) {
ex = ex.getCause();
@Override
public void clearCacheView(K key) {
dataLoader.clear(key);
}
return ex;
}
}

private class DataLoaderSubscriber extends DataLoaderSubscriberBase<V> {

private int idx = 0;

private DataLoaderSubscriber(
CompletableFuture<List<V>> valuesFuture,
List<K> keys,
List<Object> callContexts,
List<CompletableFuture<V>> queuedFutures
) {
super(valuesFuture, keys, callContexts, queuedFutures);
}

// onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee
// correctness (at the cost of speed).
@Override
public synchronized void onNext(V value) {
super.onNext(value);

if (idx >= keys.size()) {
// hang on they have given us more values than we asked for in keys
// we cant handle this
return;
}
K key = keys.get(idx);
Object callContext = callContexts.get(idx);
CompletableFuture<V> future = queuedFutures.get(idx);
onNextValue(key, value, callContext, List.of(future));

completedValues.add(value);
idx++;
}


@Override
public synchronized void onComplete() {
super.onComplete();
if (keys.size() != completedValues.size()) {
// we have more or less values than promised
// we will go through all the outstanding promises and mark those that
// have not finished as failed
for (CompletableFuture<V> queuedFuture : queuedFutures) {
if (!queuedFuture.isDone()) {
queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list"));
}
}
@Override
public void clearCacheEntriesOnExceptions(List<K> keys) {
possiblyClearCacheEntriesOnExceptions(keys);
}
possiblyClearCacheEntriesOnExceptions(clearCacheKeys);
valuesFuture.complete(completedValues);
}

@Override
public synchronized void onError(Throwable ex) {
super.onError(ex);
ex = unwrapThrowable(ex);
// Set the remaining keys to the exception.
for (int i = idx; i < queuedFutures.size(); i++) {
K key = keys.get(i);
CompletableFuture<V> future = queuedFutures.get(i);
if (! future.isDone()) {
future.completeExceptionally(ex);
// clear any cached view of this key because it failed
dataLoader.clear(key);
}
}
valuesFuture.completeExceptionally(ex);
}

}

private class DataLoaderMapEntrySubscriber extends DataLoaderSubscriberBase<Map.Entry<K, V>> {

private final Map<K, Object> callContextByKey;
private final Map<K, List<CompletableFuture<V>>> queuedFuturesByKey;
private final Map<K, V> completedValuesByKey = new HashMap<>();


private DataLoaderMapEntrySubscriber(
CompletableFuture<List<V>> valuesFuture,
List<K> keys,
List<Object> callContexts,
List<CompletableFuture<V>> queuedFutures
) {
super(valuesFuture, keys, callContexts, queuedFutures);
this.callContextByKey = new HashMap<>();
this.queuedFuturesByKey = new HashMap<>();
for (int idx = 0; idx < queuedFutures.size(); idx++) {
K key = keys.get(idx);
Object callContext = callContexts.get(idx);
CompletableFuture<V> queuedFuture = queuedFutures.get(idx);
callContextByKey.put(key, callContext);
queuedFuturesByKey.computeIfAbsent(key, k -> new ArrayList<>()).add(queuedFuture);
}
}


@Override
public synchronized void onNext(Map.Entry<K, V> entry) {
super.onNext(entry);
K key = entry.getKey();
V value = entry.getValue();

Object callContext = callContextByKey.get(key);
List<CompletableFuture<V>> futures = queuedFuturesByKey.getOrDefault(key, List.of());

onNextValue(key, value, callContext, futures);

// did we have an actual key for this value - ignore it if they send us one outside the key set
if (!futures.isEmpty()) {
completedValuesByKey.put(key, value);
}
}

@Override
public synchronized void onComplete() {
super.onComplete();

possiblyClearCacheEntriesOnExceptions(clearCacheKeys);
List<V> values = new ArrayList<>(keys.size());
for (K key : keys) {
V value = completedValuesByKey.get(key);
values.add(value);

List<CompletableFuture<V>> futures = queuedFuturesByKey.getOrDefault(key, List.of());
for (CompletableFuture<V> future : futures) {
if (! future.isDone()) {
// we have a future that never came back for that key
// but the publisher is done sending in data - it must be null
// e.g. for key X when found no value
future.complete(null);
}
}
}
valuesFuture.complete(values);
}

@Override
public synchronized void onError(Throwable ex) {
super.onError(ex);
ex = unwrapThrowable(ex);
// Complete the futures for the remaining keys with the exception.
for (int idx = 0; idx < queuedFutures.size(); idx++) {
K key = keys.get(idx);
List<CompletableFuture<V>> futures = queuedFuturesByKey.get(key);
if (!completedValuesByKey.containsKey(key)) {
for (CompletableFuture<V> future : futures) {
future.completeExceptionally(ex);
}
// clear any cached view of this key because they all failed
dataLoader.clear(key);
}
}
valuesFuture.completeExceptionally(ex);
}
};
}
}
Loading

0 comments on commit 77fd0dd

Please sign in to comment.