Skip to content

Commit

Permalink
Merge pull request #158 from graphql-java/reactive-streams-branch-ext…
Browse files Browse the repository at this point in the history
…ra-tests-for-reactive

More tests for Publishers on reactive branch
  • Loading branch information
bbakerman authored May 23, 2024
2 parents 5d826b8 + 8b344db commit e9bfc2b
Show file tree
Hide file tree
Showing 9 changed files with 586 additions and 233 deletions.
52 changes: 45 additions & 7 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.dataloader.annotations.GuardedBy;
import org.dataloader.annotations.Internal;
import org.dataloader.impl.CompletableFutureKit;
import org.dataloader.impl.DataLoaderAssertionException;
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.StatisticsCollector;
import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext;
Expand Down Expand Up @@ -624,6 +625,15 @@ private static <T> DispatchResult<T> emptyDispatchResult() {
return (DispatchResult<T>) EMPTY_DISPATCH_RESULT;
}

/**********************************************************************************************
* ********************************************************************************************
* <p>
* The reactive support classes start here
*
* @param <T> for two
**********************************************************************************************
**********************************************************************************************
*/
private abstract class DataLoaderSubscriberBase<T> implements Subscriber<T> {

final CompletableFuture<List<V>> valuesFuture;
Expand Down Expand Up @@ -721,6 +731,11 @@ private DataLoaderSubscriber(
public synchronized void onNext(V value) {
super.onNext(value);

if (idx >= keys.size()) {
// hang on they have given us more values than we asked for in keys
// we cant handle this
return;
}
K key = keys.get(idx);
Object callContext = callContexts.get(idx);
CompletableFuture<V> future = queuedFutures.get(idx);
Expand All @@ -734,8 +749,16 @@ public synchronized void onNext(V value) {
@Override
public synchronized void onComplete() {
super.onComplete();
assertResultSize(keys, completedValues);

if (keys.size() != completedValues.size()) {
// we have more or less values than promised
// we will go through all the outstanding promises and mark those that
// have not finished as failed
for (CompletableFuture<V> queuedFuture : queuedFutures) {
if (!queuedFuture.isDone()) {
queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list"));
}
}
}
possiblyClearCacheEntriesOnExceptions(clearCacheKeys);
valuesFuture.complete(completedValues);
}
Expand All @@ -748,9 +771,11 @@ public synchronized void onError(Throwable ex) {
for (int i = idx; i < queuedFutures.size(); i++) {
K key = keys.get(i);
CompletableFuture<V> future = queuedFutures.get(i);
future.completeExceptionally(ex);
// clear any cached view of this key because they all failed
dataLoader.clear(key);
if (! future.isDone()) {
future.completeExceptionally(ex);
// clear any cached view of this key because it failed
dataLoader.clear(key);
}
}
valuesFuture.completeExceptionally(ex);
}
Expand Down Expand Up @@ -790,11 +815,14 @@ public synchronized void onNext(Map.Entry<K, V> entry) {
V value = entry.getValue();

Object callContext = callContextByKey.get(key);
List<CompletableFuture<V>> futures = queuedFuturesByKey.get(key);
List<CompletableFuture<V>> futures = queuedFuturesByKey.getOrDefault(key, List.of());

onNextValue(key, value, callContext, futures);

completedValuesByKey.put(key, value);
// did we have an actual key for this value - ignore it if they send us one outside the key set
if (!futures.isEmpty()) {
completedValuesByKey.put(key, value);
}
}

@Override
Expand All @@ -806,6 +834,16 @@ public synchronized void onComplete() {
for (K key : keys) {
V value = completedValuesByKey.get(key);
values.add(value);

List<CompletableFuture<V>> futures = queuedFuturesByKey.getOrDefault(key, List.of());
for (CompletableFuture<V> future : futures) {
if (! future.isDone()) {
// we have a future that never came back for that key
// but the publisher is done sending in data - it must be null
// e.g. for key X when found no value
future.complete(null);
}
}
}
valuesFuture.complete(values);
}
Expand Down
Loading

0 comments on commit e9bfc2b

Please sign in to comment.