diff --git a/README.md b/README.md index 24a65f6..9ffe265 100644 --- a/README.md +++ b/README.md @@ -286,6 +286,66 @@ For example, let's assume you want to load users from a database, you could prob // ... ``` +### Returning a stream of results from your batch publisher + +It may be that your batch loader function is a [Reactive Streams](https://www.reactive-streams.org/) [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html), where values are emitted as an asynchronous stream. + +For example, let's say you wanted to load many users from a service without forcing the service to load all +users into its memory (which may exert considerable pressure on it). + +A `org.dataloader.BatchPublisher` may be used to load this data: + +```java + BatchPublisher batchPublisher = new BatchPublisher() { + @Override + public void load(List userIds, Subscriber userSubscriber) { + userManager.publishUsersById(userIds, userSubscriber); + } + }; + DataLoader userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher); + + // ... +``` + +Rather than waiting for all values to be returned, this `DataLoader` will complete +the `CompletableFuture` returned by `Dataloader#load(Long)` as each value is +processed. + +If an exception is thrown, the remaining futures yet to be completed are completed +exceptionally. + +You *MUST* ensure that the values are streamed in the same order as the keys provided, +with the same cardinality (i.e. the number of values must match the number of keys). +Failing to do so will result in incorrect data being returned from `DataLoader#load`. + + +### Returning a mapped stream of results from your batch publisher + +Your publisher may not necessarily return values in the same order in which it processes keys. + +For example, let's say your batch publisher function loads user data which is spread across shards, +with some shards responding more quickly than others. + +In instances like these, `org.dataloader.MappedBatchPublisher` can be used. + +```java + MappedBatchPublisher mappedBatchPublisher = new MappedBatchPublisher() { + @Override + public void load(Set userIds, Subscriber> userEntrySubscriber) { + userManager.publishUsersById(userIds, userEntrySubscriber); + } + }; + DataLoader userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher); + + // ... +``` + +Like the `BatchPublisher`, if an exception is thrown, the remaining futures yet to be completed are completed +exceptionally. + +Unlike the `BatchPublisher`, however, it is not necessary to return values in the same order as the provided keys, +or even the same number of values. + ### Error object is not a thing in a type safe Java world In the reference JS implementation if the batch loader returns an `Error` object back from the `load()` promise is rejected @@ -541,6 +601,12 @@ The following is a `BatchLoaderScheduler` that waits 10 milliseconds before invo return scheduledCall.invoke(); }).thenCompose(Function.identity()); } + + @Override + public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + snooze(10); + scheduledCall.invoke(); + } }; ``` diff --git a/build.gradle b/build.gradle index 36a620a..ccbcec6 100644 --- a/build.gradle +++ b/build.gradle @@ -30,7 +30,6 @@ def getDevelopmentVersion() { version } -def slf4jVersion = '1.7.30' def releaseVersion = System.env.RELEASE_VERSION version = releaseVersion ? releaseVersion : getDevelopmentVersion() group = 'com.graphql-java' @@ -64,16 +63,23 @@ jar { } } +def slf4jVersion = '1.7.30' +def reactiveStreamsVersion = '1.0.3' + dependencies { api 'org.slf4j:slf4j-api:' + slf4jVersion + api 'org.reactivestreams:reactive-streams:' + reactiveStreamsVersion + testImplementation 'org.slf4j:slf4j-simple:' + slf4jVersion testImplementation 'org.awaitility:awaitility:2.0.0' testImplementation "org.hamcrest:hamcrest:2.2" + testImplementation 'io.projectreactor:reactor-core:3.6.6' testImplementation 'com.github.ben-manes.caffeine:caffeine:2.9.0' testImplementation platform('org.junit:junit-bom:5.10.2') testImplementation 'org.junit.jupiter:junit-jupiter-api' testImplementation 'org.junit.jupiter:junit-jupiter-params' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' + testImplementation 'io.projectreactor:reactor-core:3.6.6' } task sourcesJar(type: Jar) { diff --git a/src/main/java/org/dataloader/BatchPublisher.java b/src/main/java/org/dataloader/BatchPublisher.java new file mode 100644 index 0000000..c499226 --- /dev/null +++ b/src/main/java/org/dataloader/BatchPublisher.java @@ -0,0 +1,36 @@ +package org.dataloader; + +import org.reactivestreams.Subscriber; + +import java.util.List; + +/** + * A function that is invoked for batch loading a stream of data values indicated by the provided list of keys. + *

+ * The function must call the provided {@link Subscriber} to process the values it has retrieved to allow + * the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available + * (rather than when all values have been retrieved). + *

+ * NOTE: It is required that {@link Subscriber#onNext(Object)} is invoked on each value in the same order as + * the provided keys and that you provide a value for every key provided. + * + * @param type parameter indicating the type of keys to use for data load requests. + * @param type parameter indicating the type of values returned + * @see BatchLoader for the non-reactive version + */ +public interface BatchPublisher { + /** + * Called to batch the provided keys into a stream of values. You must provide + * the same number of values as there as keys, and they must be in the order of the keys. + *

+ * The idiomatic approach would be to create a reactive {@link org.reactivestreams.Publisher} that provides + * the values given the keys and then subscribe to it with the provided {@link Subscriber}. + *

+ * NOTE: It is required that {@link Subscriber#onNext(Object)} is invoked on each value in the same order as + * the provided keys and that you provide a value for every key provided. + * + * @param keys the collection of keys to load + * @param subscriber as values arrive you must call the subscriber for each value + */ + void load(List keys, Subscriber subscriber); +} diff --git a/src/main/java/org/dataloader/BatchPublisherWithContext.java b/src/main/java/org/dataloader/BatchPublisherWithContext.java new file mode 100644 index 0000000..4eadfe9 --- /dev/null +++ b/src/main/java/org/dataloader/BatchPublisherWithContext.java @@ -0,0 +1,34 @@ +package org.dataloader; + +import org.reactivestreams.Subscriber; + +import java.util.List; + +/** + * This form of {@link BatchPublisher} is given a {@link org.dataloader.BatchLoaderEnvironment} object + * that encapsulates the calling context. A typical use case is passing in security credentials or database details + * for example. + *

+ * See {@link BatchPublisher} for more details on the design invariants that you must implement in order to + * use this interface. + */ +public interface BatchPublisherWithContext { + /** + * Called to batch the provided keys into a stream of values. You must provide + * the same number of values as there as keys, and they must be in the order of the keys. + *

+ * The idiomatic approach would be to create a reactive {@link org.reactivestreams.Publisher} that provides + * the values given the keys and then subscribe to it with the provided {@link Subscriber}. + *

+ * NOTE: It is required that {@link Subscriber#onNext(Object)} is invoked on each value in the same order as + * the provided keys and that you provide a value for every key provided. + *

+ * This is given an environment object to that maybe be useful during the call. A typical use case + * is passing in security credentials or database details for example. + * + * @param keys the collection of keys to load + * @param subscriber as values arrive you must call the subscriber for each value + * @param environment an environment object that can help with the call + */ + void load(List keys, Subscriber subscriber, BatchLoaderEnvironment environment); +} diff --git a/src/main/java/org/dataloader/DataLoaderFactory.java b/src/main/java/org/dataloader/DataLoaderFactory.java index 013f473..db14f2e 100644 --- a/src/main/java/org/dataloader/DataLoaderFactory.java +++ b/src/main/java/org/dataloader/DataLoaderFactory.java @@ -278,6 +278,274 @@ public static DataLoader newMappedDataLoaderWithTry(MappedBatchLoad return mkDataLoader(batchLoadFunction, options); } + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size). + * + * @param batchLoadFunction the batch load function to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newPublisherDataLoader(BatchPublisher batchLoadFunction) { + return newPublisherDataLoader(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function with the provided options + * + * @param batchLoadFunction the batch load function to use + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newPublisherDataLoader(BatchPublisher batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size) where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + *

+ * If it's important you to know the exact status of each item in a batch call and whether it threw exceptions then + * you can use this form to create the data loader. + *

+ * Using Try objects allows you to capture a value returned or an exception that might + * have occurred trying to get a value. . + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newPublisherDataLoaderWithTry(BatchPublisher> batchLoadFunction) { + return newPublisherDataLoaderWithTry(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function and with the provided options + * where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + * + * @see #newDataLoaderWithTry(BatchLoader) + */ + public static DataLoader newPublisherDataLoaderWithTry(BatchPublisher> batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size). + * + * @param batchLoadFunction the batch load function to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newPublisherDataLoader(BatchPublisherWithContext batchLoadFunction) { + return newPublisherDataLoader(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function with the provided options + * + * @param batchLoadFunction the batch load function to use + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newPublisherDataLoader(BatchPublisherWithContext batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size) where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + *

+ * If it's important you to know the exact status of each item in a batch call and whether it threw exceptions then + * you can use this form to create the data loader. + *

+ * Using Try objects allows you to capture a value returned or an exception that might + * have occurred trying to get a value. . + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newPublisherDataLoaderWithTry(BatchPublisherWithContext> batchLoadFunction) { + return newPublisherDataLoaderWithTry(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function and with the provided options + * where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + * + * @see #newPublisherDataLoaderWithTry(BatchPublisher) + */ + public static DataLoader newPublisherDataLoaderWithTry(BatchPublisherWithContext> batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size). + * + * @param batchLoadFunction the batch load function to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newMappedPublisherDataLoader(MappedBatchPublisher batchLoadFunction) { + return newMappedPublisherDataLoader(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function with the provided options + * + * @param batchLoadFunction the batch load function to use + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newMappedPublisherDataLoader(MappedBatchPublisher batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size) where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + *

+ * If it's important you to know the exact status of each item in a batch call and whether it threw exceptions then + * you can use this form to create the data loader. + *

+ * Using Try objects allows you to capture a value returned or an exception that might + * have occurred trying to get a value. . + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newMappedPublisherDataLoaderWithTry(MappedBatchPublisher> batchLoadFunction) { + return newMappedPublisherDataLoaderWithTry(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function and with the provided options + * where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + * + * @see #newDataLoaderWithTry(BatchLoader) + */ + public static DataLoader newMappedPublisherDataLoaderWithTry(MappedBatchPublisher> batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size). + * + * @param batchLoadFunction the batch load function to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newMappedPublisherDataLoader(MappedBatchPublisherWithContext batchLoadFunction) { + return newMappedPublisherDataLoader(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function with the provided options + * + * @param batchLoadFunction the batch load function to use + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newMappedPublisherDataLoader(MappedBatchPublisherWithContext batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + + /** + * Creates new DataLoader with the specified batch loader function and default options + * (batching, caching and unlimited batch size) where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + *

+ * If it's important you to know the exact status of each item in a batch call and whether it threw exceptions then + * you can use this form to create the data loader. + *

+ * Using Try objects allows you to capture a value returned or an exception that might + * have occurred trying to get a value. . + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param the key type + * @param the value type + * + * @return a new DataLoader + */ + public static DataLoader newMappedPublisherDataLoaderWithTry(MappedBatchPublisherWithContext> batchLoadFunction) { + return newMappedPublisherDataLoaderWithTry(batchLoadFunction, null); + } + + /** + * Creates new DataLoader with the specified batch loader function and with the provided options + * where the batch loader function returns a list of + * {@link org.dataloader.Try} objects. + * + * @param batchLoadFunction the batch load function to use that uses {@link org.dataloader.Try} objects + * @param options the options to use + * @param the key type + * @param the value type + * + * @return a new DataLoader + * + * @see #newMappedPublisherDataLoaderWithTry(MappedBatchPublisher) + */ + public static DataLoader newMappedPublisherDataLoaderWithTry(MappedBatchPublisherWithContext> batchLoadFunction, DataLoaderOptions options) { + return mkDataLoader(batchLoadFunction, options); + } + static DataLoader mkDataLoader(Object batchLoadFunction, DataLoaderOptions options) { return new DataLoader<>(batchLoadFunction, options); } diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index d934de2..9cd38d6 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -3,6 +3,7 @@ import org.dataloader.annotations.GuardedBy; import org.dataloader.annotations.Internal; import org.dataloader.impl.CompletableFutureKit; +import org.dataloader.reactive.ReactiveSupport; import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.StatisticsCollector; import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext; @@ -10,6 +11,7 @@ import org.dataloader.stats.context.IncrementCacheHitCountStatisticsContext; import org.dataloader.stats.context.IncrementLoadCountStatisticsContext; import org.dataloader.stats.context.IncrementLoadErrorCountStatisticsContext; +import org.reactivestreams.Subscriber; import java.time.Clock; import java.time.Instant; @@ -152,11 +154,13 @@ CompletableFuture load(K key, Object loadContext) { } } + @SuppressWarnings("unchecked") Object getCacheKey(K key) { return loaderOptions.cacheKeyFunction().isPresent() ? loaderOptions.cacheKeyFunction().get().getKey(key) : key; } + @SuppressWarnings("unchecked") Object getCacheKeyWithContext(K key, Object context) { return loaderOptions.cacheKeyFunction().isPresent() ? loaderOptions.cacheKeyFunction().get().getKeyWithContext(key, context) : key; @@ -241,10 +245,14 @@ private CompletableFuture> sliceIntoBatchesOfBatches(List keys, List< @SuppressWarnings("unchecked") private CompletableFuture> dispatchQueueBatch(List keys, List callContexts, List> queuedFutures) { stats.incrementBatchLoadCountBy(keys.size(), new IncrementBatchLoadCountByStatisticsContext<>(keys, callContexts)); - CompletableFuture> batchLoad = invokeLoader(keys, callContexts, loaderOptions.cachingEnabled()); + CompletableFuture> batchLoad = invokeLoader(keys, callContexts, queuedFutures, loaderOptions.cachingEnabled()); return batchLoad .thenApply(values -> { assertResultSize(keys, values); + if (isPublisher() || isMappedPublisher()) { + // We have already completed the queued futures by the time the overall batchLoad future has completed. + return values; + } List clearCacheKeys = new ArrayList<>(); for (int idx = 0; idx < queuedFutures.size(); idx++) { @@ -342,14 +350,15 @@ private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, bool CompletableFuture invokeLoaderImmediately(K key, Object keyContext, boolean cachingEnabled) { List keys = singletonList(key); List keyContexts = singletonList(keyContext); - return invokeLoader(keys, keyContexts, cachingEnabled) + List> queuedFutures = singletonList(new CompletableFuture<>()); + return invokeLoader(keys, keyContexts, queuedFutures, cachingEnabled) .thenApply(list -> list.get(0)) .toCompletableFuture(); } - CompletableFuture> invokeLoader(List keys, List keyContexts, boolean cachingEnabled) { + CompletableFuture> invokeLoader(List keys, List keyContexts, List> queuedFutures, boolean cachingEnabled) { if (!cachingEnabled) { - return invokeLoader(keys, keyContexts); + return invokeLoader(keys, keyContexts, queuedFutures); } CompletableFuture>> cacheCallCF = getFromValueCache(keys); return cacheCallCF.thenCompose(cachedValues -> { @@ -360,6 +369,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, List missedKeyIndexes = new ArrayList<>(); List missedKeys = new ArrayList<>(); List missedKeyContexts = new ArrayList<>(); + List> missedQueuedFutures = new ArrayList<>(); // if they return a ValueCachingNotSupported exception then we insert this special marker value, and it // means it's a total miss, we need to get all these keys via the batch loader @@ -369,6 +379,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, missedKeyIndexes.add(i); missedKeys.add(keys.get(i)); missedKeyContexts.add(keyContexts.get(i)); + missedQueuedFutures.add(queuedFutures.get(i)); } } else { assertState(keys.size() == cachedValues.size(), () -> "The size of the cached values MUST be the same size as the key list"); @@ -393,7 +404,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, // we missed some keys from cache, so send them to the batch loader // and then fill in their values // - CompletableFuture> batchLoad = invokeLoader(missedKeys, missedKeyContexts); + CompletableFuture> batchLoad = invokeLoader(missedKeys, missedKeyContexts, missedQueuedFutures); return batchLoad.thenCompose(missedValues -> { assertResultSize(missedKeys, missedValues); @@ -412,8 +423,7 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, }); } - - CompletableFuture> invokeLoader(List keys, List keyContexts) { + CompletableFuture> invokeLoader(List keys, List keyContexts, List> queuedFutures) { CompletableFuture> batchLoad; try { Object context = loaderOptions.getBatchLoaderContextProvider().getContext(); @@ -421,6 +431,10 @@ CompletableFuture> invokeLoader(List keys, List keyContexts) .context(context).keyContexts(keys, keyContexts).build(); if (isMapLoader()) { batchLoad = invokeMapBatchLoader(keys, environment); + } else if (isPublisher()) { + batchLoad = invokeBatchPublisher(keys, keyContexts, queuedFutures, environment); + } else if (isMappedPublisher()) { + batchLoad = invokeMappedBatchPublisher(keys, keyContexts, queuedFutures, environment); } else { batchLoad = invokeListBatchLoader(keys, environment); } @@ -492,10 +506,72 @@ private CompletableFuture> invokeMapBatchLoader(List keys, BatchLoade }); } + private CompletableFuture> invokeBatchPublisher(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { + CompletableFuture> loadResult = new CompletableFuture<>(); + Subscriber subscriber = ReactiveSupport.batchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); + + BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); + if (batchLoadFunction instanceof BatchPublisherWithContext) { + //noinspection unchecked + BatchPublisherWithContext loadFunction = (BatchPublisherWithContext) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber, environment); + batchLoaderScheduler.scheduleBatchPublisher(loadCall, keys, environment); + } else { + loadFunction.load(keys, subscriber, environment); + } + } else { + //noinspection unchecked + BatchPublisher loadFunction = (BatchPublisher) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber); + batchLoaderScheduler.scheduleBatchPublisher(loadCall, keys, null); + } else { + loadFunction.load(keys, subscriber); + } + } + return loadResult; + } + + private CompletableFuture> invokeMappedBatchPublisher(List keys, List keyContexts, List> queuedFutures, BatchLoaderEnvironment environment) { + CompletableFuture> loadResult = new CompletableFuture<>(); + Subscriber> subscriber = ReactiveSupport.mappedBatchSubscriber(loadResult, keys, keyContexts, queuedFutures, helperIntegration()); + Set setOfKeys = new LinkedHashSet<>(keys); + BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); + if (batchLoadFunction instanceof MappedBatchPublisherWithContext) { + //noinspection unchecked + MappedBatchPublisherWithContext loadFunction = (MappedBatchPublisherWithContext) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(keys, subscriber, environment); + batchLoaderScheduler.scheduleBatchPublisher(loadCall, keys, environment); + } else { + loadFunction.load(keys, subscriber, environment); + } + } else { + //noinspection unchecked + MappedBatchPublisher loadFunction = (MappedBatchPublisher) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchPublisherCall loadCall = () -> loadFunction.load(setOfKeys, subscriber); + batchLoaderScheduler.scheduleBatchPublisher(loadCall, keys, null); + } else { + loadFunction.load(setOfKeys, subscriber); + } + } + return loadResult; + } + private boolean isMapLoader() { return batchLoadFunction instanceof MappedBatchLoader || batchLoadFunction instanceof MappedBatchLoaderWithContext; } + private boolean isPublisher() { + return batchLoadFunction instanceof BatchPublisher; + } + + private boolean isMappedPublisher() { + return batchLoadFunction instanceof MappedBatchPublisher; + } + int dispatchDepth() { synchronized (dataLoader) { return loaderQueue.size(); @@ -546,4 +622,23 @@ private CompletableFuture> setToValueCache(List assembledValues, List private static DispatchResult emptyDispatchResult() { return (DispatchResult) EMPTY_DISPATCH_RESULT; } + + private ReactiveSupport.HelperIntegration helperIntegration() { + return new ReactiveSupport.HelperIntegration<>() { + @Override + public StatisticsCollector getStats() { + return stats; + } + + @Override + public void clearCacheView(K key) { + dataLoader.clear(key); + } + + @Override + public void clearCacheEntriesOnExceptions(List keys) { + possiblyClearCacheEntriesOnExceptions(keys); + } + }; + } } diff --git a/src/main/java/org/dataloader/MappedBatchPublisher.java b/src/main/java/org/dataloader/MappedBatchPublisher.java new file mode 100644 index 0000000..754ee52 --- /dev/null +++ b/src/main/java/org/dataloader/MappedBatchPublisher.java @@ -0,0 +1,30 @@ +package org.dataloader; + +import org.reactivestreams.Subscriber; + +import java.util.Map; +import java.util.Set; + +/** + * A function that is invoked for batch loading a stream of data values indicated by the provided list of keys. + *

+ * The function must call the provided {@link Subscriber} to process the key/value pairs it has retrieved to allow + * the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available + * (rather than when all values have been retrieved). + * + * @param type parameter indicating the type of keys to use for data load requests. + * @param type parameter indicating the type of values returned + * @see MappedBatchLoader for the non-reactive version + */ +public interface MappedBatchPublisher { + /** + * Called to batch the provided keys into a stream of map entries of keys and values. + *

+ * The idiomatic approach would be to create a reactive {@link org.reactivestreams.Publisher} that provides + * the values given the keys and then subscribe to it with the provided {@link Subscriber}. + * + * @param keys the collection of keys to load + * @param subscriber as values arrive you must call the subscriber for each value + */ + void load(Set keys, Subscriber> subscriber); +} diff --git a/src/main/java/org/dataloader/MappedBatchPublisherWithContext.java b/src/main/java/org/dataloader/MappedBatchPublisherWithContext.java new file mode 100644 index 0000000..2e94152 --- /dev/null +++ b/src/main/java/org/dataloader/MappedBatchPublisherWithContext.java @@ -0,0 +1,32 @@ +package org.dataloader; + +import org.reactivestreams.Subscriber; + +import java.util.List; +import java.util.Map; + +/** + * This form of {@link MappedBatchPublisher} is given a {@link org.dataloader.BatchLoaderEnvironment} object + * that encapsulates the calling context. A typical use case is passing in security credentials or database details + * for example. + *

+ * See {@link MappedBatchPublisher} for more details on the design invariants that you must implement in order to + * use this interface. + */ +public interface MappedBatchPublisherWithContext { + + /** + * Called to batch the provided keys into a stream of map entries of keys and values. + *

+ * The idiomatic approach would be to create a reactive {@link org.reactivestreams.Publisher} that provides + * the values given the keys and then subscribe to it with the provided {@link Subscriber}. + *

+ * This is given an environment object to that maybe be useful during the call. A typical use case + * is passing in security credentials or database details for example. + * + * @param keys the collection of keys to load + * @param subscriber as values arrive you must call the subscriber for each value + * @param environment an environment object that can help with the call + */ + void load(List keys, Subscriber> subscriber, BatchLoaderEnvironment environment); +} diff --git a/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java b/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java new file mode 100644 index 0000000..c2f5438 --- /dev/null +++ b/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java @@ -0,0 +1,104 @@ +package org.dataloader.reactive; + +import org.dataloader.Try; +import org.dataloader.stats.context.IncrementBatchLoadExceptionCountStatisticsContext; +import org.dataloader.stats.context.IncrementLoadErrorCountStatisticsContext; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import static org.dataloader.impl.Assertions.assertState; + +/** + * The base class for our reactive subscriber support + * + * @param for two + */ +abstract class AbstractBatchSubscriber implements Subscriber { + + final CompletableFuture> valuesFuture; + final List keys; + final List callContexts; + final List> queuedFutures; + final ReactiveSupport.HelperIntegration helperIntegration; + + List clearCacheKeys = new ArrayList<>(); + List completedValues = new ArrayList<>(); + boolean onErrorCalled = false; + boolean onCompleteCalled = false; + + AbstractBatchSubscriber( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + ReactiveSupport.HelperIntegration helperIntegration + ) { + this.valuesFuture = valuesFuture; + this.keys = keys; + this.callContexts = callContexts; + this.queuedFutures = queuedFutures; + this.helperIntegration = helperIntegration; + } + + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(keys.size()); + } + + @Override + public void onNext(T v) { + assertState(!onErrorCalled, () -> "onError has already been called; onNext may not be invoked."); + assertState(!onCompleteCalled, () -> "onComplete has already been called; onNext may not be invoked."); + } + + @Override + public void onComplete() { + assertState(!onErrorCalled, () -> "onError has already been called; onComplete may not be invoked."); + onCompleteCalled = true; + } + + @Override + public void onError(Throwable throwable) { + assertState(!onCompleteCalled, () -> "onComplete has already been called; onError may not be invoked."); + onErrorCalled = true; + + helperIntegration.getStats().incrementBatchLoadExceptionCount(new IncrementBatchLoadExceptionCountStatisticsContext<>(keys, callContexts)); + } + + /* + * A value has arrived - how do we complete the future that's associated with it in a common way + */ + void onNextValue(K key, V value, Object callContext, List> futures) { + if (value instanceof Try) { + // we allow the batch loader to return a Try so we can better represent a computation + // that might have worked or not. + //noinspection unchecked + Try tryValue = (Try) value; + if (tryValue.isSuccess()) { + futures.forEach(f -> f.complete(tryValue.get())); + } else { + helperIntegration.getStats().incrementLoadErrorCount(new IncrementLoadErrorCountStatisticsContext<>(key, callContext)); + futures.forEach(f -> f.completeExceptionally(tryValue.getThrowable())); + clearCacheKeys.add(key); + } + } else { + futures.forEach(f -> f.complete(value)); + } + } + + Throwable unwrapThrowable(Throwable ex) { + if (ex instanceof CompletionException) { + ex = ex.getCause(); + } + return ex; + } + + void possiblyClearCacheEntriesOnExceptions() { + helperIntegration.clearCacheEntriesOnExceptions(clearCacheKeys); + } +} diff --git a/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java b/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java new file mode 100644 index 0000000..d0b8110 --- /dev/null +++ b/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java @@ -0,0 +1,86 @@ +package org.dataloader.reactive; + +import org.dataloader.impl.DataLoaderAssertionException; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * This class can be used to subscribe to a {@link org.reactivestreams.Publisher} and then + * have the values it receives complete the data loader keys. The keys and values must be + * in index order. + *

+ * This is a reactive version of {@link org.dataloader.BatchLoader} + * + * @param the type of keys + * @param the type of values + */ +class BatchSubscriberImpl extends AbstractBatchSubscriber { + + private int idx = 0; + + BatchSubscriberImpl( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + ReactiveSupport.HelperIntegration helperIntegration + ) { + super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); + } + + // onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee + // correctness (at the cost of speed). + @Override + public synchronized void onNext(V value) { + super.onNext(value); + + if (idx >= keys.size()) { + // hang on they have given us more values than we asked for in keys + // we cant handle this + return; + } + K key = keys.get(idx); + Object callContext = callContexts.get(idx); + CompletableFuture future = queuedFutures.get(idx); + onNextValue(key, value, callContext, List.of(future)); + + completedValues.add(value); + idx++; + } + + + @Override + public synchronized void onComplete() { + super.onComplete(); + if (keys.size() != completedValues.size()) { + // we have more or less values than promised + // we will go through all the outstanding promises and mark those that + // have not finished as failed + for (CompletableFuture queuedFuture : queuedFutures) { + if (!queuedFuture.isDone()) { + queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list")); + } + } + } + possiblyClearCacheEntriesOnExceptions(); + valuesFuture.complete(completedValues); + } + + @Override + public synchronized void onError(Throwable ex) { + super.onError(ex); + ex = unwrapThrowable(ex); + // Set the remaining keys to the exception. + for (int i = idx; i < queuedFutures.size(); i++) { + K key = keys.get(i); + CompletableFuture future = queuedFutures.get(i); + if (!future.isDone()) { + future.completeExceptionally(ex); + // clear any cached view of this key because it failed + helperIntegration.clearCacheView(key); + } + } + valuesFuture.completeExceptionally(ex); + } +} diff --git a/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java b/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java new file mode 100644 index 0000000..d56efa0 --- /dev/null +++ b/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java @@ -0,0 +1,103 @@ +package org.dataloader.reactive; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * This class can be used to subscribe to a {@link org.reactivestreams.Publisher} and then + * have the values it receives complete the data loader keys in a map lookup fashion. + *

+ * This is a reactive version of {@link org.dataloader.MappedBatchLoader} + * + * @param the type of keys + * @param the type of values + */ +class MappedBatchSubscriberImpl extends AbstractBatchSubscriber> { + + private final Map callContextByKey; + private final Map>> queuedFuturesByKey; + private final Map completedValuesByKey = new HashMap<>(); + + + MappedBatchSubscriberImpl( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + ReactiveSupport.HelperIntegration helperIntegration + ) { + super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); + this.callContextByKey = new HashMap<>(); + this.queuedFuturesByKey = new HashMap<>(); + for (int idx = 0; idx < queuedFutures.size(); idx++) { + K key = keys.get(idx); + Object callContext = callContexts.get(idx); + CompletableFuture queuedFuture = queuedFutures.get(idx); + callContextByKey.put(key, callContext); + queuedFuturesByKey.computeIfAbsent(key, k -> new ArrayList<>()).add(queuedFuture); + } + } + + + @Override + public synchronized void onNext(Map.Entry entry) { + super.onNext(entry); + K key = entry.getKey(); + V value = entry.getValue(); + + Object callContext = callContextByKey.get(key); + List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); + + onNextValue(key, value, callContext, futures); + + // did we have an actual key for this value - ignore it if they send us one outside the key set + if (!futures.isEmpty()) { + completedValuesByKey.put(key, value); + } + } + + @Override + public synchronized void onComplete() { + super.onComplete(); + + possiblyClearCacheEntriesOnExceptions(); + List values = new ArrayList<>(keys.size()); + for (K key : keys) { + V value = completedValuesByKey.get(key); + values.add(value); + + List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); + for (CompletableFuture future : futures) { + if (!future.isDone()) { + // we have a future that never came back for that key + // but the publisher is done sending in data - it must be null + // e.g. for key X when found no value + future.complete(null); + } + } + } + valuesFuture.complete(values); + } + + @Override + public synchronized void onError(Throwable ex) { + super.onError(ex); + ex = unwrapThrowable(ex); + // Complete the futures for the remaining keys with the exception. + for (int idx = 0; idx < queuedFutures.size(); idx++) { + K key = keys.get(idx); + List> futures = queuedFuturesByKey.get(key); + if (!completedValuesByKey.containsKey(key)) { + for (CompletableFuture future : futures) { + future.completeExceptionally(ex); + } + // clear any cached view of this key because they all failed + helperIntegration.clearCacheView(key); + } + } + valuesFuture.completeExceptionally(ex); + } +} diff --git a/src/main/java/org/dataloader/reactive/ReactiveSupport.java b/src/main/java/org/dataloader/reactive/ReactiveSupport.java new file mode 100644 index 0000000..fc03bb0 --- /dev/null +++ b/src/main/java/org/dataloader/reactive/ReactiveSupport.java @@ -0,0 +1,45 @@ +package org.dataloader.reactive; + +import org.dataloader.stats.StatisticsCollector; +import org.reactivestreams.Subscriber; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public class ReactiveSupport { + + public static Subscriber batchSubscriber( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + ReactiveSupport.HelperIntegration helperIntegration + ) { + return new BatchSubscriberImpl<>(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); + } + + public static Subscriber> mappedBatchSubscriber( + CompletableFuture> valuesFuture, + List keys, + List callContexts, + List> queuedFutures, + ReactiveSupport.HelperIntegration helperIntegration + ) { + return new MappedBatchSubscriberImpl<>(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); + } + + /** + * Just some callbacks to the data loader code to do common tasks + * + * @param for keys + */ + public interface HelperIntegration { + + StatisticsCollector getStats(); + + void clearCacheView(K key); + + void clearCacheEntriesOnExceptions(List keys); + } +} diff --git a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java index 7cddd54..e7e95d9 100644 --- a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java +++ b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java @@ -5,6 +5,8 @@ import org.dataloader.DataLoader; import org.dataloader.DataLoaderOptions; import org.dataloader.MappedBatchLoader; +import org.dataloader.MappedBatchPublisher; +import org.dataloader.BatchPublisher; import java.util.List; import java.util.Map; @@ -42,6 +44,13 @@ interface ScheduledMappedBatchLoaderCall { CompletionStage> invoke(); } + /** + * This represents a callback that will invoke a {@link BatchPublisher} or {@link MappedBatchPublisher} function under the covers + */ + interface ScheduledBatchPublisherCall { + void invoke(); + } + /** * This is called to schedule a {@link BatchLoader} call. * @@ -71,4 +80,16 @@ interface ScheduledMappedBatchLoaderCall { * @return a promise to the values that come from the {@link BatchLoader} */ CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment); + + /** + * This is called to schedule a {@link BatchPublisher} call. + * + * @param scheduledCall the callback that needs to be invoked to allow the {@link BatchPublisher} to proceed. + * @param keys this is the list of keys that will be passed to the {@link BatchPublisher}. + * This is provided only for informative reasons and, you can't change the keys that are used + * @param environment this is the {@link BatchLoaderEnvironment} in place, + * which can be null if it's a simple {@link BatchPublisher} call + * @param the key type + */ + void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment); } diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index d25dfa7..a20c0ea 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -1,11 +1,13 @@ import org.dataloader.BatchLoader; import org.dataloader.BatchLoaderEnvironment; import org.dataloader.BatchLoaderWithContext; +import org.dataloader.BatchPublisher; import org.dataloader.CacheMap; import org.dataloader.DataLoader; import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderOptions; import org.dataloader.MappedBatchLoaderWithContext; +import org.dataloader.MappedBatchPublisher; import org.dataloader.Try; import org.dataloader.fixtures.SecurityCtx; import org.dataloader.fixtures.User; @@ -15,6 +17,7 @@ import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.Statistics; import org.dataloader.stats.ThreadLocalStatisticsCollector; +import org.reactivestreams.Subscriber; import java.time.Duration; import java.util.ArrayList; @@ -171,7 +174,7 @@ private void tryExample() { } } - private void tryBatcLoader() { + private void tryBatchLoader() { DataLoader dataLoader = DataLoaderFactory.newDataLoaderWithTry(new BatchLoader>() { @Override public CompletionStage>> load(List keys) { @@ -187,6 +190,26 @@ public CompletionStage>> load(List keys) { }); } + private void batchPublisher() { + BatchPublisher batchPublisher = new BatchPublisher() { + @Override + public void load(List userIds, Subscriber userSubscriber) { + userManager.publishUsersById(userIds, userSubscriber); + } + }; + DataLoader userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher); + } + + private void mappedBatchPublisher() { + MappedBatchPublisher mappedBatchPublisher = new MappedBatchPublisher() { + @Override + public void load(Set userIds, Subscriber> userEntrySubscriber) { + userManager.publishUsersById(userIds, userEntrySubscriber); + } + }; + DataLoader userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher); + } + DataLoader userDataLoader; private void clearCacheOnError() { @@ -304,6 +327,12 @@ public CompletionStage> scheduleMappedBatchLoader(ScheduledMapp return scheduledCall.invoke(); }).thenCompose(Function.identity()); } + + @Override + public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + snooze(10); + scheduledCall.invoke(); + } }; } diff --git a/src/test/java/org/dataloader/DataLoaderTest.java b/src/test/java/org/dataloader/DataLoaderTest.java index db71c1e..1ce34ea 100644 --- a/src/test/java/org/dataloader/DataLoaderTest.java +++ b/src/test/java/org/dataloader/DataLoaderTest.java @@ -16,12 +16,19 @@ package org.dataloader; +import org.awaitility.Duration; import org.dataloader.fixtures.CustomCacheMap; import org.dataloader.fixtures.JsonObject; -import org.dataloader.fixtures.TestKit; import org.dataloader.fixtures.User; import org.dataloader.fixtures.UserManager; +import org.dataloader.fixtures.parameterized.ListDataLoaderFactory; +import org.dataloader.fixtures.parameterized.MappedDataLoaderFactory; +import org.dataloader.fixtures.parameterized.MappedPublisherDataLoaderFactory; +import org.dataloader.fixtures.parameterized.PublisherDataLoaderFactory; +import org.dataloader.fixtures.parameterized.TestDataLoaderFactory; +import org.dataloader.fixtures.parameterized.TestReactiveDataLoaderFactory; import org.dataloader.impl.CompletableFutureKit; +import org.dataloader.impl.DataLoaderAssertionException; import org.junit.jupiter.api.Named; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -46,11 +53,16 @@ import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static java.util.concurrent.CompletableFuture.*; import static org.awaitility.Awaitility.await; import static org.dataloader.DataLoaderFactory.newDataLoader; import static org.dataloader.DataLoaderFactory.newMappedDataLoader; +import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoader; +import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoaderWithTry; +import static org.dataloader.DataLoaderFactory.newPublisherDataLoader; +import static org.dataloader.DataLoaderFactory.newPublisherDataLoaderWithTry; import static org.dataloader.DataLoaderOptions.newOptions; -import static org.dataloader.fixtures.TestKit.futureError; +import static org.dataloader.fixtures.TestKit.areAllDone; import static org.dataloader.fixtures.TestKit.listFrom; import static org.dataloader.impl.CompletableFutureKit.cause; import static org.hamcrest.MatcherAssert.assertThat; @@ -59,6 +71,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.fail; /** * Tests for {@link DataLoader}. @@ -99,7 +112,7 @@ public void basic_map_batch_loading() { mapOfResults.put(k, k); } }); - return CompletableFuture.completedFuture(mapOfResults); + return completedFuture(mapOfResults); }; DataLoader loader = DataLoaderFactory.newMappedDataLoader(evensOnlyMappedBatchLoader); @@ -419,7 +432,7 @@ public void should_Allow_priming_the_cache_with_a_future(TestDataLoaderFactory f List> loadCalls = new ArrayList<>(); DataLoader identityLoader = factory.idLoader(new DataLoaderOptions(), loadCalls); - DataLoader dlFluency = identityLoader.prime("A", CompletableFuture.completedFuture("A")); + DataLoader dlFluency = identityLoader.prime("A", completedFuture("A")); assertThat(dlFluency, equalTo(identityLoader)); CompletableFuture future1 = identityLoader.load("A"); @@ -727,10 +740,10 @@ public void should_work_with_duplicate_keys_when_caching_disabled(TestDataLoader assertThat(future1.get(), equalTo("A")); assertThat(future2.get(), equalTo("B")); assertThat(future3.get(), equalTo("A")); - if (factory instanceof ListDataLoaderFactory) { - assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A")))); - } else { + if (factory instanceof MappedDataLoaderFactory || factory instanceof MappedPublisherDataLoaderFactory) { assertThat(loadCalls, equalTo(singletonList(asList("A", "B")))); + } else { + assertThat(loadCalls, equalTo(singletonList(asList("A", "B", "A")))); } } @@ -987,7 +1000,7 @@ public void batches_multiple_requests_with_max_batch_size(TestDataLoaderFactory identityLoader.dispatch(); - CompletableFuture.allOf(f1, f2, f3).join(); + allOf(f1, f2, f3).join(); assertThat(f1.join(), equalTo(1)); assertThat(f2.join(), equalTo(2)); @@ -1030,13 +1043,13 @@ public void should_Batch_loads_occurring_within_futures(TestDataLoaderFactory fa AtomicBoolean v4Called = new AtomicBoolean(); - CompletableFuture.supplyAsync(nullValue).thenAccept(v1 -> { + supplyAsync(nullValue).thenAccept(v1 -> { identityLoader.load("a"); - CompletableFuture.supplyAsync(nullValue).thenAccept(v2 -> { + supplyAsync(nullValue).thenAccept(v2 -> { identityLoader.load("b"); - CompletableFuture.supplyAsync(nullValue).thenAccept(v3 -> { + supplyAsync(nullValue).thenAccept(v3 -> { identityLoader.load("c"); - CompletableFuture.supplyAsync(nullValue).thenAccept( + supplyAsync(nullValue).thenAccept( v4 -> { identityLoader.load("d"); v4Called.set(true); @@ -1053,12 +1066,101 @@ public void should_Batch_loads_occurring_within_futures(TestDataLoaderFactory fa singletonList(asList("a", "b", "c", "d")))); } + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void should_blowup_after_N_keys(TestDataLoaderFactory factory) { + if (!(factory instanceof TestReactiveDataLoaderFactory)) { + return; + } + // + // if we blow up after emitting N keys, the N keys should work but the rest of the keys + // should be exceptional + DataLoader identityLoader = ((TestReactiveDataLoaderFactory) factory).idLoaderBlowsUpsAfterN(3, new DataLoaderOptions(), new ArrayList<>()); + CompletableFuture cf1 = identityLoader.load(1); + CompletableFuture cf2 = identityLoader.load(2); + CompletableFuture cf3 = identityLoader.load(3); + CompletableFuture cf4 = identityLoader.load(4); + CompletableFuture cf5 = identityLoader.load(5); + identityLoader.dispatch(); + await().until(cf5::isDone); + + assertThat(cf1.join(), equalTo(1)); + assertThat(cf2.join(), equalTo(2)); + assertThat(cf3.join(), equalTo(3)); + assertThat(cf4.isCompletedExceptionally(), is(true)); + assertThat(cf5.isCompletedExceptionally(), is(true)); + + } + + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void when_values_size_are_less_then_key_size(TestDataLoaderFactory factory) { + // + // what happens if we want 4 values but are only given 2 back say + // + DataLoader identityLoader = factory.onlyReturnsNValues(2, new DataLoaderOptions(), new ArrayList<>()); + CompletableFuture cf1 = identityLoader.load("A"); + CompletableFuture cf2 = identityLoader.load("B"); + CompletableFuture cf3 = identityLoader.load("C"); + CompletableFuture cf4 = identityLoader.load("D"); + identityLoader.dispatch(); + + await().atMost(Duration.FIVE_SECONDS).until(() -> areAllDone(cf1, cf2, cf3, cf4)); + + if (factory instanceof ListDataLoaderFactory) { + assertThat(cause(cf1), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf2), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class)); + } else if (factory instanceof PublisherDataLoaderFactory) { + // some have completed progressively but the other never did + assertThat(cf1.join(), equalTo("A")); + assertThat(cf2.join(), equalTo("B")); + assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class)); + } else { + // with the maps it's ok to have fewer results + assertThat(cf1.join(), equalTo("A")); + assertThat(cf2.join(), equalTo("B")); + assertThat(cf3.join(), equalTo(null)); + assertThat(cf4.join(), equalTo(null)); + } + } + + @ParameterizedTest + @MethodSource("dataLoaderFactories") + public void when_values_size_are_more_then_key_size(TestDataLoaderFactory factory) { + // + // what happens if we want 4 values but only given 6 back say + // + DataLoader identityLoader = factory.idLoaderReturnsTooMany(2, new DataLoaderOptions(), new ArrayList<>()); + CompletableFuture cf1 = identityLoader.load("A"); + CompletableFuture cf2 = identityLoader.load("B"); + CompletableFuture cf3 = identityLoader.load("C"); + CompletableFuture cf4 = identityLoader.load("D"); + identityLoader.dispatch(); + await().atMost(Duration.FIVE_SECONDS).until(() -> areAllDone(cf1, cf2, cf3, cf4)); + + + if (factory instanceof ListDataLoaderFactory) { + assertThat(cause(cf1), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf2), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf3), instanceOf(DataLoaderAssertionException.class)); + assertThat(cause(cf4), instanceOf(DataLoaderAssertionException.class)); + } else { + assertThat(cf1.join(), equalTo("A")); + assertThat(cf2.join(), equalTo("B")); + assertThat(cf3.join(), equalTo("C")); + assertThat(cf4.join(), equalTo("D")); + } + } + @Test public void can_call_a_loader_from_a_loader() throws Exception { List> deepLoadCalls = new ArrayList<>(); DataLoader deepLoader = newDataLoader(keys -> { deepLoadCalls.add(keys); - return CompletableFuture.completedFuture(keys); + return completedFuture(keys); }); List> aLoadCalls = new ArrayList<>(); @@ -1078,7 +1180,7 @@ public void can_call_a_loader_from_a_loader() throws Exception { CompletableFuture b1 = bLoader.load("B1"); CompletableFuture b2 = bLoader.load("B2"); - CompletableFuture.allOf( + allOf( aLoader.dispatch(), deepLoader.dispatch(), bLoader.dispatch(), @@ -1104,11 +1206,10 @@ public void can_call_a_loader_from_a_loader() throws Exception { public void should_allow_composition_of_data_loader_calls() { UserManager userManager = new UserManager(); - BatchLoader userBatchLoader = userIds -> CompletableFuture - .supplyAsync(() -> userIds - .stream() - .map(userManager::loadUserById) - .collect(Collectors.toList())); + BatchLoader userBatchLoader = userIds -> supplyAsync(() -> userIds + .stream() + .map(userManager::loadUserById) + .collect(Collectors.toList())); DataLoader userLoader = newDataLoader(userBatchLoader); AtomicBoolean gandalfCalled = new AtomicBoolean(false); @@ -1144,127 +1245,21 @@ private static CacheKey getJsonObjectCacheMapFn() { .collect(Collectors.joining()); } - private static Stream dataLoaderFactories() { - return Stream.of( - Arguments.of(Named.of("List DataLoader", new ListDataLoaderFactory())), - Arguments.of(Named.of("Mapped DataLoader", new MappedDataLoaderFactory())) - ); - } - - public interface TestDataLoaderFactory { - DataLoader idLoader(DataLoaderOptions options, List> loadCalls); - DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls); - DataLoader idLoaderAllExceptions(DataLoaderOptions options, List> loadCalls); - DataLoader idLoaderOddEvenExceptions(DataLoaderOptions options, List> loadCalls); - } - - private static class ListDataLoaderFactory implements TestDataLoaderFactory { - @Override - public DataLoader idLoader(DataLoaderOptions options, List> loadCalls) { - return newDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - @SuppressWarnings("unchecked") - List values = keys.stream() - .map(k -> (V) k) - .collect(Collectors.toList()); - return CompletableFuture.completedFuture(values); - }, options); - } - - @Override - public DataLoader idLoaderBlowsUps( - DataLoaderOptions options, List> loadCalls) { - return newDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - return TestKit.futureError(); - }, options); - } - - @Override - public DataLoader idLoaderAllExceptions(DataLoaderOptions options, List> loadCalls) { - return newDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - - List errors = keys.stream().map(k -> new IllegalStateException("Error")).collect(Collectors.toList()); - return CompletableFuture.completedFuture(errors); - }, options); - } - - @Override - public DataLoader idLoaderOddEvenExceptions(DataLoaderOptions options, List> loadCalls) { - return newDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - - List errors = new ArrayList<>(); - for (Integer key : keys) { - if (key % 2 == 0) { - errors.add(key); - } else { - errors.add(new IllegalStateException("Error")); - } - } - return CompletableFuture.completedFuture(errors); - }, options); - } - } - - private static class MappedDataLoaderFactory implements TestDataLoaderFactory { - - @Override - public DataLoader idLoader( - DataLoaderOptions options, List> loadCalls) { - return newMappedDataLoader((keys) -> { - loadCalls.add(new ArrayList<>(keys)); - Map map = new HashMap<>(); - //noinspection unchecked - keys.forEach(k -> map.put(k, (V) k)); - return CompletableFuture.completedFuture(map); - }, options); - } - - @Override - public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { - return newMappedDataLoader((keys) -> { - loadCalls.add(new ArrayList<>(keys)); - return futureError(); - }, options); - } - - @Override - public DataLoader idLoaderAllExceptions( - DataLoaderOptions options, List> loadCalls) { - return newMappedDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - Map errorByKey = new HashMap<>(); - keys.forEach(k -> errorByKey.put(k, new IllegalStateException("Error"))); - return CompletableFuture.completedFuture(errorByKey); - }, options); - } - - @Override - public DataLoader idLoaderOddEvenExceptions( - DataLoaderOptions options, List> loadCalls) { - return newMappedDataLoader(keys -> { - loadCalls.add(new ArrayList<>(keys)); - - Map errorByKey = new HashMap<>(); - for (Integer key : keys) { - if (key % 2 == 0) { - errorByKey.put(key, key); - } else { - errorByKey.put(key, new IllegalStateException("Error")); - } - } - return CompletableFuture.completedFuture(errorByKey); - }, options); - } - } - private static class ThrowingCacheMap extends CustomCacheMap { + @Override public CompletableFuture get(String key) { throw new RuntimeException("Cache implementation failed."); } } + + private static Stream dataLoaderFactories() { + return Stream.of( + Arguments.of(Named.of("List DataLoader", new ListDataLoaderFactory())), + Arguments.of(Named.of("Mapped DataLoader", new MappedDataLoaderFactory())), + Arguments.of(Named.of("Publisher DataLoader", new PublisherDataLoaderFactory())), + Arguments.of(Named.of("Mapped Publisher DataLoader", new MappedPublisherDataLoaderFactory())) + ); + } } diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index adffb06..c22988d 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -131,4 +131,13 @@ public static Set asSet(T... elements) { public static Set asSet(Collection elements) { return new LinkedHashSet<>(elements); } + + public static boolean areAllDone(CompletableFuture... cfs) { + for (CompletableFuture cf : cfs) { + if (! cf.isDone()) { + return false; + } + } + return true; + } } diff --git a/src/test/java/org/dataloader/fixtures/UserManager.java b/src/test/java/org/dataloader/fixtures/UserManager.java index 24fee0d..4fed3f7 100644 --- a/src/test/java/org/dataloader/fixtures/UserManager.java +++ b/src/test/java/org/dataloader/fixtures/UserManager.java @@ -1,5 +1,8 @@ package org.dataloader.fixtures; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Flux; + import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -52,6 +55,14 @@ public List loadUsersById(List userIds) { return userIds.stream().map(this::loadUserById).collect(Collectors.toList()); } + public void publishUsersById(List userIds, Subscriber userSubscriber) { + Flux.fromIterable(loadUsersById(userIds)).subscribe(userSubscriber); + } + + public void publishUsersById(Set userIds, Subscriber> userEntrySubscriber) { + Flux.fromIterable(loadMapOfUsersByIds(null, userIds).entrySet()).subscribe(userEntrySubscriber); + } + public Map loadMapOfUsersByIds(SecurityCtx callCtx, Set userIds) { Map map = new HashMap<>(); userIds.forEach(userId -> { diff --git a/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java new file mode 100644 index 0000000..ee1f1d7 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/ListDataLoaderFactory.java @@ -0,0 +1,79 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.fixtures.TestKit; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.dataloader.DataLoaderFactory.newDataLoader; + +public class ListDataLoaderFactory implements TestDataLoaderFactory { + @Override + public DataLoader idLoader(DataLoaderOptions options, List> loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + return completedFuture(keys); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps( + DataLoaderOptions options, List> loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + return TestKit.futureError(); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions(DataLoaderOptions options, List> loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + List errors = keys.stream().map(k -> new IllegalStateException("Error")).collect(Collectors.toList()); + return completedFuture(errors); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions(DataLoaderOptions options, List> loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + List errors = new ArrayList<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errors.add(key); + } else { + errors.add(new IllegalStateException("Error")); + } + } + return completedFuture(errors); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + return completedFuture(keys.subList(0, N)); + }, options); + } + + @Override + public DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls) { + return newDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + List l = new ArrayList<>(keys); + for (int i = 0; i < howManyMore; i++) { + l.add("extra-" + i); + } + return completedFuture(l); + }, options); + } +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java new file mode 100644 index 0000000..8f41441 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/MappedDataLoaderFactory.java @@ -0,0 +1,95 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.dataloader.DataLoaderFactory.newMappedDataLoader; +import static org.dataloader.fixtures.TestKit.futureError; + +public class MappedDataLoaderFactory implements TestDataLoaderFactory { + + @Override + public DataLoader idLoader( + DataLoaderOptions options, List> loadCalls) { + return newMappedDataLoader((keys) -> { + loadCalls.add(new ArrayList<>(keys)); + Map map = new HashMap<>(); + keys.forEach(k -> map.put(k, k)); + return completedFuture(map); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { + return newMappedDataLoader((keys) -> { + loadCalls.add(new ArrayList<>(keys)); + return futureError(); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + Map errorByKey = new HashMap<>(); + keys.forEach(k -> errorByKey.put(k, new IllegalStateException("Error"))); + return completedFuture(errorByKey); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + Map errorByKey = new HashMap<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errorByKey.put(key, key); + } else { + errorByKey.put(key, new IllegalStateException("Error")); + } + } + return completedFuture(errorByKey); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + Map collect = List.copyOf(keys).subList(0, N).stream().collect(Collectors.toMap( + k -> k, v -> v + )); + return completedFuture(collect); + }, options); + } + + @Override + public DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedDataLoader(keys -> { + loadCalls.add(new ArrayList<>(keys)); + + List l = new ArrayList<>(keys); + for (int i = 0; i < howManyMore; i++) { + l.add("extra-" + i); + } + + Map collect = l.stream().collect(Collectors.toMap( + k -> k, v -> v + )); + return completedFuture(collect); + }, options); + } +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java new file mode 100644 index 0000000..9c92330 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/MappedPublisherDataLoaderFactory.java @@ -0,0 +1,108 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.Try; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; +import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoader; +import static org.dataloader.DataLoaderFactory.newMappedPublisherDataLoaderWithTry; + +public class MappedPublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory { + + @Override + public DataLoader idLoader( + DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Map map = new HashMap<>(); + keys.forEach(k -> map.put(k, k)); + Flux.fromIterable(map.entrySet()).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Flux.>error(new IllegalStateException("Error")).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Stream>> failures = keys.stream().map(k -> Map.entry(k, Try.failed(new IllegalStateException("Error")))); + Flux.fromStream(failures).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions( + DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + Map> errorByKey = new HashMap<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errorByKey.put(key, Try.succeeded(key)); + } else { + errorByKey.put(key, Try.failed(new IllegalStateException("Error"))); + } + } + Flux.fromIterable(errorByKey.entrySet()).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.stream().limit(N).collect(toList()); + Flux> subFlux = Flux.fromIterable(nKeys).map(k -> Map.entry(k, k)); + subFlux.concatWith(Flux.error(new IllegalStateException("Error"))) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.stream().limit(N).collect(toList()); + Flux.fromIterable(nKeys).map(k -> Map.entry(k, k)) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls) { + return newMappedPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List l = new ArrayList<>(keys); + for (int i = 0; i < howManyMore; i++) { + l.add("extra-" + i); + } + + Flux.fromIterable(l).map(k -> Map.entry(k, k)) + .subscribe(subscriber); + }, options); + } +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java new file mode 100644 index 0000000..d75ff38 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/PublisherDataLoaderFactory.java @@ -0,0 +1,100 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.Try; +import reactor.core.publisher.Flux; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Stream; + +import static org.dataloader.DataLoaderFactory.newPublisherDataLoader; +import static org.dataloader.DataLoaderFactory.newPublisherDataLoaderWithTry; + +public class PublisherDataLoaderFactory implements TestDataLoaderFactory, TestReactiveDataLoaderFactory { + + @Override + public DataLoader idLoader( + DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Flux.fromIterable(keys).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Flux.error(new IllegalStateException("Error")).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderAllExceptions( + DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + Stream> failures = keys.stream().map(k -> Try.failed(new IllegalStateException("Error"))); + Flux.fromStream(failures).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderOddEvenExceptions( + DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoaderWithTry((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List> errors = new ArrayList<>(); + for (Integer key : keys) { + if (key % 2 == 0) { + errors.add(Try.succeeded(key)); + } else { + errors.add(Try.failed(new IllegalStateException("Error"))); + } + } + Flux.fromIterable(errors).subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.subList(0, N); + Flux subFlux = Flux.fromIterable(nKeys); + subFlux.concatWith(Flux.error(new IllegalStateException("Error"))) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List nKeys = keys.subList(0, N); + Flux.fromIterable(nKeys) + .subscribe(subscriber); + }, options); + } + + @Override + public DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls) { + return newPublisherDataLoader((keys, subscriber) -> { + loadCalls.add(new ArrayList<>(keys)); + + List l = new ArrayList<>(keys); + for (int i = 0; i < howManyMore; i++) { + l.add("extra-" + i); + } + + Flux.fromIterable(l) + .subscribe(subscriber); + }, options); + } +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java new file mode 100644 index 0000000..8c1bc22 --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/TestDataLoaderFactory.java @@ -0,0 +1,22 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public interface TestDataLoaderFactory { + DataLoader idLoader(DataLoaderOptions options, List> loadCalls); + + DataLoader idLoaderBlowsUps(DataLoaderOptions options, List> loadCalls); + + DataLoader idLoaderAllExceptions(DataLoaderOptions options, List> loadCalls); + + DataLoader idLoaderOddEvenExceptions(DataLoaderOptions options, List> loadCalls); + + DataLoader onlyReturnsNValues(int N, DataLoaderOptions options, ArrayList loadCalls); + + DataLoader idLoaderReturnsTooMany(int howManyMore, DataLoaderOptions options, ArrayList loadCalls); +} diff --git a/src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java b/src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java new file mode 100644 index 0000000..d45932c --- /dev/null +++ b/src/test/java/org/dataloader/fixtures/parameterized/TestReactiveDataLoaderFactory.java @@ -0,0 +1,11 @@ +package org.dataloader.fixtures.parameterized; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; + +import java.util.Collection; +import java.util.List; + +public interface TestReactiveDataLoaderFactory { + DataLoader idLoaderBlowsUpsAfterN(int N, DataLoaderOptions options, List> loadCalls); +} diff --git a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java index 274ed8c..ff9ec8e 100644 --- a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java +++ b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java @@ -36,6 +36,11 @@ public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderC public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { return scheduledCall.invoke(); } + + @Override + public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + scheduledCall.invoke(); + } }; private BatchLoaderScheduler delayedScheduling(int ms) { @@ -56,6 +61,12 @@ public CompletionStage> scheduleMappedBatchLoader(ScheduledMapp return scheduledCall.invoke(); }).thenCompose(Function.identity()); } + + @Override + public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + snooze(ms); + scheduledCall.invoke(); + } }; } @@ -139,6 +150,15 @@ public CompletionStage> scheduleMappedBatchLoader(ScheduledMapp return scheduledCall.invoke(); }).thenCompose(Function.identity()); } + + @Override + public void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + CompletableFuture.supplyAsync(() -> { + snooze(10); + scheduledCall.invoke(); + return null; + }); + } }; DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(funkyScheduler);