From 400f70fa2ccf19ac129e750b51a12cdbeb4bf733 Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com> Date: Fri, 7 Jun 2024 15:04:26 +0200 Subject: [PATCH] [SDCISA-16147, #583] Fix another make-json-EventLoop-blocker Related: SDCISA-15633, SDCISA-15833, SDCISA-16147, https://github.com/swisspost/vertx-redisques/issues/170, https://github.com/swisspost/vertx-redisques/pull/177, https://github.com/swisspost/vertx-rest-storage/pull/186, https://github.com/swisspost/gateleen/pull/577, https://github.com/swisspost/vertx-redisques/pull/181, https://github.com/swisspost/gateleen/issues/493, https://github.com/swisspost/vertx-rest-storage/issues/188, https://github.com/swisspost/gateleen/issues/583, https://github.com/swisspost/gateleen/pull/584 --- .../kafka/KafkaProducerRecordBuilder.java | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java index 724fc3f6..80b40e9b 100644 --- a/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java +++ b/gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaProducerRecordBuilder.java @@ -52,33 +52,31 @@ class KafkaProducerRecordBuilder { * @throws ValidationException when the payload is not valid (missing properties, wrong types, etc.) */ Future>> buildRecordsAsync(String topic, Buffer payload) { - return Future.succeededFuture().compose((Void v) -> { + return Future.succeededFuture().compose((Void v) -> vertx.executeBlocking(() -> { JsonObject payloadObj; try { payloadObj = new JsonObject(payload); } catch (DecodeException de) { - return Future.failedFuture(new ValidationException("Error while parsing payload", de)); + throw new ValidationException("Error while parsing payload", de); } JsonArray recordsArray; try { recordsArray = payloadObj.getJsonArray(RECORDS); } catch (ClassCastException cce) { - return Future.failedFuture(new ValidationException("Property '" + RECORDS + "' must be of type JsonArray holding JsonObject objects")); + throw new ValidationException("Property '" + RECORDS + "' must be of type JsonArray holding JsonObject objects"); } if (recordsArray == null) { - return Future.failedFuture(new ValidationException("Missing 'records' array")); + throw new ValidationException("Missing 'records' array"); } - return vertx.executeBlocking(() -> { - long beginEpchMs = currentTimeMillis(); - List> kafkaProducerRecords = new ArrayList<>(recordsArray.size()); - for (int i = 0; i < recordsArray.size(); i++) { - kafkaProducerRecords.add(fromRecordJsonObject(topic, recordsArray.getJsonObject(i))); - } - long durationMs = currentTimeMillis() - beginEpchMs; - log.debug("Serializing JSON did block thread for {}ms", durationMs); - return kafkaProducerRecords; - }); - }); + long beginEpchMs = currentTimeMillis(); + List> kafkaProducerRecords = new ArrayList<>(recordsArray.size()); + for (int i = 0; i < recordsArray.size(); i++) { + kafkaProducerRecords.add(fromRecordJsonObject(topic, recordsArray.getJsonObject(i))); + } + long durationMs = currentTimeMillis() - beginEpchMs; + log.debug("Serializing JSON did block thread for {}ms", durationMs); + return kafkaProducerRecords; + })); } /** @deprecated Use {@link #buildRecordsAsync(String, Buffer)}. */