Skip to content

Commit

Permalink
Merge pull request #151 from graphql-java/reactive-streams-branch
Browse files Browse the repository at this point in the history
A PR for reactive streams support
  • Loading branch information
bbakerman authored May 27, 2024
2 parents 20b3c01 + c3e6ee5 commit d44070a
Show file tree
Hide file tree
Showing 24 changed files with 1,549 additions and 144 deletions.
66 changes: 66 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,66 @@ For example, let's assume you want to load users from a database, you could prob
// ...
```

### Returning a stream of results from your batch publisher

It may be that your batch loader function is a [Reactive Streams](https://www.reactive-streams.org/) [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html), where values are emitted as an asynchronous stream.

For example, let's say you wanted to load many users from a service without forcing the service to load all
users into its memory (which may exert considerable pressure on it).

A `org.dataloader.BatchPublisher` may be used to load this data:

```java
BatchPublisher<Long, User> batchPublisher = new BatchPublisher<Long, User>() {
@Override
public void load(List<Long> userIds, Subscriber<User> userSubscriber) {
userManager.publishUsersById(userIds, userSubscriber);
}
};
DataLoader<Long, User> userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher);

// ...
```

Rather than waiting for all values to be returned, this `DataLoader` will complete
the `CompletableFuture<User>` returned by `Dataloader#load(Long)` as each value is
processed.

If an exception is thrown, the remaining futures yet to be completed are completed
exceptionally.

You *MUST* ensure that the values are streamed in the same order as the keys provided,
with the same cardinality (i.e. the number of values must match the number of keys).
Failing to do so will result in incorrect data being returned from `DataLoader#load`.


### Returning a mapped stream of results from your batch publisher

Your publisher may not necessarily return values in the same order in which it processes keys.

For example, let's say your batch publisher function loads user data which is spread across shards,
with some shards responding more quickly than others.

In instances like these, `org.dataloader.MappedBatchPublisher` can be used.

```java
MappedBatchPublisher<Long, User> mappedBatchPublisher = new MappedBatchPublisher<Long, User>() {
@Override
public void load(Set<Long> userIds, Subscriber<Map.Entry<Long, User>> userEntrySubscriber) {
userManager.publishUsersById(userIds, userEntrySubscriber);
}
};
DataLoader<Long, User> userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher);

// ...
```

Like the `BatchPublisher`, if an exception is thrown, the remaining futures yet to be completed are completed
exceptionally.

Unlike the `BatchPublisher`, however, it is not necessary to return values in the same order as the provided keys,
or even the same number of values.

### Error object is not a thing in a type safe Java world

In the reference JS implementation if the batch loader returns an `Error` object back from the `load()` promise is rejected
Expand Down Expand Up @@ -541,6 +601,12 @@ The following is a `BatchLoaderScheduler` that waits 10 milliseconds before invo
return scheduledCall.invoke();
}).thenCompose(Function.identity());
}

@Override
public <K> void scheduleBatchPublisher(ScheduledBatchPublisherCall scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
snooze(10);
scheduledCall.invoke();
}
};
```

Expand Down
8 changes: 7 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def getDevelopmentVersion() {
version
}

def slf4jVersion = '1.7.30'
def releaseVersion = System.env.RELEASE_VERSION
version = releaseVersion ? releaseVersion : getDevelopmentVersion()
group = 'com.graphql-java'
Expand Down Expand Up @@ -64,16 +63,23 @@ jar {
}
}

def slf4jVersion = '1.7.30'
def reactiveStreamsVersion = '1.0.3'

dependencies {
api 'org.slf4j:slf4j-api:' + slf4jVersion
api 'org.reactivestreams:reactive-streams:' + reactiveStreamsVersion

testImplementation 'org.slf4j:slf4j-simple:' + slf4jVersion
testImplementation 'org.awaitility:awaitility:2.0.0'
testImplementation "org.hamcrest:hamcrest:2.2"
testImplementation 'io.projectreactor:reactor-core:3.6.6'
testImplementation 'com.github.ben-manes.caffeine:caffeine:2.9.0'
testImplementation platform('org.junit:junit-bom:5.10.2')
testImplementation 'org.junit.jupiter:junit-jupiter-api'
testImplementation 'org.junit.jupiter:junit-jupiter-params'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testImplementation 'io.projectreactor:reactor-core:3.6.6'
}

task sourcesJar(type: Jar) {
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/org/dataloader/BatchPublisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.dataloader;

import org.reactivestreams.Subscriber;

import java.util.List;

/**
* A function that is invoked for batch loading a stream of data values indicated by the provided list of keys.
* <p>
* The function <b>must</b> call the provided {@link Subscriber} to process the values it has retrieved to allow
* the future returned by {@link DataLoader#load(Object)} to complete as soon as the individual value is available
* (rather than when all values have been retrieved).
* <p>
* <b>NOTE:</b> It is <b>required</b> that {@link Subscriber#onNext(Object)} is invoked on each value in the same order as
* the provided keys and that you provide a value for every key provided.
*
* @param <K> type parameter indicating the type of keys to use for data load requests.
* @param <V> type parameter indicating the type of values returned
* @see BatchLoader for the non-reactive version
*/
public interface BatchPublisher<K, V> {
/**
* Called to batch the provided keys into a stream of values. You <b>must</b> provide
* the same number of values as there as keys, and they <b>must</b> be in the order of the keys.
* <p>
* The idiomatic approach would be to create a reactive {@link org.reactivestreams.Publisher} that provides
* the values given the keys and then subscribe to it with the provided {@link Subscriber}.
* <p>
* <b>NOTE:</b> It is <b>required</b> that {@link Subscriber#onNext(Object)} is invoked on each value in the same order as
* the provided keys and that you provide a value for every key provided.
*
* @param keys the collection of keys to load
* @param subscriber as values arrive you must call the subscriber for each value
*/
void load(List<K> keys, Subscriber<V> subscriber);
}
34 changes: 34 additions & 0 deletions src/main/java/org/dataloader/BatchPublisherWithContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.dataloader;

import org.reactivestreams.Subscriber;

import java.util.List;

/**
* This form of {@link BatchPublisher} is given a {@link org.dataloader.BatchLoaderEnvironment} object
* that encapsulates the calling context. A typical use case is passing in security credentials or database details
* for example.
* <p>
* See {@link BatchPublisher} for more details on the design invariants that you must implement in order to
* use this interface.
*/
public interface BatchPublisherWithContext<K, V> {
/**
* Called to batch the provided keys into a stream of values. You <b>must</b> provide
* the same number of values as there as keys, and they <b>must</b> be in the order of the keys.
* <p>
* The idiomatic approach would be to create a reactive {@link org.reactivestreams.Publisher} that provides
* the values given the keys and then subscribe to it with the provided {@link Subscriber}.
* <p>
* <b>NOTE:</b> It is <b>required</b> that {@link Subscriber#onNext(Object)} is invoked on each value in the same order as
* the provided keys and that you provide a value for every key provided.
* <p>
* This is given an environment object to that maybe be useful during the call. A typical use case
* is passing in security credentials or database details for example.
*
* @param keys the collection of keys to load
* @param subscriber as values arrive you must call the subscriber for each value
* @param environment an environment object that can help with the call
*/
void load(List<K> keys, Subscriber<V> subscriber, BatchLoaderEnvironment environment);
}
Loading

0 comments on commit d44070a

Please sign in to comment.