diff --git a/jooq-async-jdbc/src/main/java/fr/maif/jooq/jdbc/JdbcPgAsyncTransaction.java b/jooq-async-jdbc/src/main/java/fr/maif/jooq/jdbc/JdbcPgAsyncTransaction.java index 7cc3266..1d3d6ac 100644 --- a/jooq-async-jdbc/src/main/java/fr/maif/jooq/jdbc/JdbcPgAsyncTransaction.java +++ b/jooq-async-jdbc/src/main/java/fr/maif/jooq/jdbc/JdbcPgAsyncTransaction.java @@ -69,7 +69,7 @@ public CompletionStage rollback() { @Override public Publisher stream(Integer fetchSize, Function> queryFunction) { return Flux - .fromIterable(() -> queryFunction.apply(client).stream().iterator()) + .fromIterable(() -> queryFunction.apply(client).fetchSize(fetchSize).stream().iterator()) .publishOn(Schedulers.parallel()) .map(JooqQueryResult::new); } diff --git a/jooq-async-reactive/src/main/java/fr/maif/jooq/reactive/ReactivePgAsyncTransaction.java b/jooq-async-reactive/src/main/java/fr/maif/jooq/reactive/ReactivePgAsyncTransaction.java index 1531abd..ffe40e1 100644 --- a/jooq-async-reactive/src/main/java/fr/maif/jooq/reactive/ReactivePgAsyncTransaction.java +++ b/jooq-async-reactive/src/main/java/fr/maif/jooq/reactive/ReactivePgAsyncTransaction.java @@ -60,7 +60,7 @@ public Flux stream(Integer fetchSize, Function Mono.just(List.empty()) .expand(results -> { if (first.getAndSet(false) || cursor.hasMore()) { - return Mono.fromCompletionStage(cursor.read(500).map(rs -> + return Mono.fromCompletionStage(cursor.read(fetchSize).map(rs -> List.ofAll(rs) .map(ReactiveRowQueryResult::new) .map(r -> (QueryResult)r)