Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweaked readme #161

Merged
merged 2 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ For example, let's assume you want to load users from a database, you could prob

### Returning a stream of results from your batch publisher

It may be that your batch loader function is a [Reactive Streams](https://www.reactive-streams.org/) [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html), where values are emitted as an asynchronous stream.
It may be that your batch loader function can use a [Reactive Streams](https://www.reactive-streams.org/) [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html), where values are emitted as an asynchronous stream.

For example, let's say you wanted to load many users from a service without forcing the service to load all
users into its memory (which may exert considerable pressure on it).
Expand All @@ -299,29 +299,37 @@ A `org.dataloader.BatchPublisher` may be used to load this data:
BatchPublisher<Long, User> batchPublisher = new BatchPublisher<Long, User>() {
@Override
public void load(List<Long> userIds, Subscriber<User> userSubscriber) {
userManager.publishUsersById(userIds, userSubscriber);
Publisher<User> userResults = userManager.publishUsersById(userIds);
userResults.subscribe(userSubscriber);
}
};
DataLoader<Long, User> userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher);

// ...
```

Rather than waiting for all values to be returned, this `DataLoader` will complete
Rather than waiting for all user values to be returned on one batch, this `DataLoader` will complete
the `CompletableFuture<User>` returned by `Dataloader#load(Long)` as each value is
processed.
published.

This pattern means that data loader values can (in theory) be satisfied more quickly than if we wait for
all results in the batch to be retrieved and hence the overall result may finish more quickly.

If an exception is thrown, the remaining futures yet to be completed are completed
exceptionally.

You *MUST* ensure that the values are streamed in the same order as the keys provided,
with the same cardinality (i.e. the number of values must match the number of keys).

Failing to do so will result in incorrect data being returned from `DataLoader#load`.

`BatchPublisher` is the reactive version of `BatchLoader`.


### Returning a mapped stream of results from your batch publisher

Your publisher may not necessarily return values in the same order in which it processes keys.
Your publisher may not necessarily return values in the same order in which it processes keys and it
may not be able to find a value for each key presented.

For example, let's say your batch publisher function loads user data which is spread across shards,
with some shards responding more quickly than others.
Expand All @@ -332,7 +340,8 @@ In instances like these, `org.dataloader.MappedBatchPublisher` can be used.
MappedBatchPublisher<Long, User> mappedBatchPublisher = new MappedBatchPublisher<Long, User>() {
@Override
public void load(Set<Long> userIds, Subscriber<Map.Entry<Long, User>> userEntrySubscriber) {
userManager.publishUsersById(userIds, userEntrySubscriber);
Publisher<Map.Entry<Long, User>> userEntries = userManager.publishUsersById(userIds);
userEntries.subscribe(userEntrySubscriber);
}
};
DataLoader<Long, User> userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher);
Expand All @@ -346,6 +355,8 @@ exceptionally.
Unlike the `BatchPublisher`, however, it is not necessary to return values in the same order as the provided keys,
or even the same number of values.

`MappedBatchPublisher` is the reactive version of `MappedBatchLoader`.

### Error object is not a thing in a type safe Java world

In the reference JS implementation if the batch loader returns an `Error` object back from the `load()` promise is rejected
Expand Down
8 changes: 6 additions & 2 deletions src/test/java/ReadmeExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.Statistics;
import org.dataloader.stats.ThreadLocalStatisticsCollector;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -194,7 +196,8 @@ private void batchPublisher() {
BatchPublisher<Long, User> batchPublisher = new BatchPublisher<Long, User>() {
@Override
public void load(List<Long> userIds, Subscriber<User> userSubscriber) {
userManager.publishUsersById(userIds, userSubscriber);
Publisher<User> userResults = userManager.publishUsersById(userIds);
userResults.subscribe(userSubscriber);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to show how you might create a Publisher and then subscribe to it. This is more clear in an instructional sense

};
DataLoader<Long, User> userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher);
Expand All @@ -204,7 +207,8 @@ private void mappedBatchPublisher() {
MappedBatchPublisher<Long, User> mappedBatchPublisher = new MappedBatchPublisher<Long, User>() {
@Override
public void load(Set<Long> userIds, Subscriber<Map.Entry<Long, User>> userEntrySubscriber) {
userManager.publishUsersById(userIds, userEntrySubscriber);
Publisher<Map.Entry<Long, User>> userEntries = userManager.publishUsersById(userIds);
userEntries.subscribe(userEntrySubscriber);
}
};
DataLoader<Long, User> userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher);
Expand Down
9 changes: 5 additions & 4 deletions src/test/java/org/dataloader/fixtures/UserManager.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.dataloader.fixtures;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;

Expand Down Expand Up @@ -55,12 +56,12 @@ public List<User> loadUsersById(List<Long> userIds) {
return userIds.stream().map(this::loadUserById).collect(Collectors.toList());
}

public void publishUsersById(List<Long> userIds, Subscriber<? super User> userSubscriber) {
Flux.fromIterable(loadUsersById(userIds)).subscribe(userSubscriber);
public Publisher<User> publishUsersById(List<Long> userIds) {
return Flux.fromIterable(loadUsersById(userIds));
}

public void publishUsersById(Set<Long> userIds, Subscriber<? super Map.Entry<Long, User>> userEntrySubscriber) {
Flux.fromIterable(loadMapOfUsersByIds(null, userIds).entrySet()).subscribe(userEntrySubscriber);
public Publisher<Map.Entry<Long, User>> publishUsersById(Set<Long> userIds) {
return Flux.fromIterable(loadMapOfUsersByIds(null, userIds).entrySet());
}

public Map<Long, User> loadMapOfUsersByIds(SecurityCtx callCtx, Set<Long> userIds) {
Expand Down
Loading