diff --git a/jooq-async-api/src/main/java/fr/maif/jooq/PgAsyncPool.java b/jooq-async-api/src/main/java/fr/maif/jooq/PgAsyncPool.java index f564945..b59cb1d 100644 --- a/jooq-async-api/src/main/java/fr/maif/jooq/PgAsyncPool.java +++ b/jooq-async-api/src/main/java/fr/maif/jooq/PgAsyncPool.java @@ -20,7 +20,11 @@ default CompletionStage inTransaction(Function t.commit().thenApply(__ -> r)) .exceptionallyCompose(e -> - t.rollback().thenCompose(__ -> CompletableFuture.failedStage(e)) + t.rollback().thenCompose(__ -> { + CompletableFuture cf = new CompletableFuture<>(); + cf.completeExceptionally(e); + return cf; + }) ) ); } diff --git a/jooq-async-reactive/src/main/java/fr/maif/jooq/reactive/AbstractReactivePgAsyncClient.java b/jooq-async-reactive/src/main/java/fr/maif/jooq/reactive/AbstractReactivePgAsyncClient.java index bb2970b..83a4cb3 100644 --- a/jooq-async-reactive/src/main/java/fr/maif/jooq/reactive/AbstractReactivePgAsyncClient.java +++ b/jooq-async-reactive/src/main/java/fr/maif/jooq/reactive/AbstractReactivePgAsyncClient.java @@ -70,11 +70,11 @@ public CompletionStage> queryOne(Function return rawPreparedQuery(queryFunction).thenCompose(res -> { switch (res.size()) { case 0: - return CompletableFuture.completedStage(Option.none()); + return completedStage(Option.none()); case 1: - return CompletableFuture.completedStage(Option.of(new ReactiveRowQueryResult(res.iterator().next()))); + return completedStage(Option.of(new ReactiveRowQueryResult(res.iterator().next()))); default: - return CompletableFuture.failedStage(new TooManyRowsException(String.format("Found more than one row: %d", res.size()))); + return failedStage(new TooManyRowsException(String.format("Found more than one row: %d", res.size()))); } }); } @@ -96,9 +96,9 @@ public CompletionStage execute(Function qu public CompletionStage executeBatch(Function> queryFunction) { List queries = queryFunction.apply(DSL.using(configuration)); if (queries.isEmpty()) { - return CompletableFuture.completedStage(0L); + return completedStage(0L); } - return queries.foldLeft(CompletableFuture.completedStage(0L), (acc, query) -> + return queries.foldLeft(completedStage(0L), (acc, query) -> acc.thenCompose(count -> { log(query); String preparedQuery = toPreparedQuery(query); @@ -113,7 +113,7 @@ public CompletionStage executeBatch(Function executeBatch(Function queryFunction, List> values) { if (values.isEmpty()) { - return CompletableFuture.completedStage(0L); + return completedStage(0L); } CompletableFuture> rowFuture = new CompletableFuture<>(); try { @@ -274,4 +274,16 @@ JsonNode readJson(String json) { throw new RuntimeException("Error parsing json "+json, e); } } + + static CompletionStage completedStage(T value) { + CompletableFuture cf = new CompletableFuture<>(); + cf.complete(value); + return cf; + } + + static CompletionStage failedStage(Throwable throwable) { + CompletableFuture cf = new CompletableFuture<>(); + cf.completeExceptionally(throwable); + return cf; + } }