Skip to content

Commit

Permalink
[SDCISA-16147, swisspost#583] Fix another make-json-EventLoop-blocker
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed Jun 7, 2024
1 parent c36b202 commit 400f70f
Showing 1 changed file with 13 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,31 @@ class KafkaProducerRecordBuilder {
* @throws ValidationException when the payload is not valid (missing properties, wrong types, etc.)
*/
Future<List<KafkaProducerRecord<String, String>>> buildRecordsAsync(String topic, Buffer payload) {
return Future.<Void>succeededFuture().compose((Void v) -> {
return Future.<Void>succeededFuture().compose((Void v) -> vertx.executeBlocking(() -> {
JsonObject payloadObj;
try {
payloadObj = new JsonObject(payload);
} catch (DecodeException de) {
return Future.failedFuture(new ValidationException("Error while parsing payload", de));
throw new ValidationException("Error while parsing payload", de);
}
JsonArray recordsArray;
try {
recordsArray = payloadObj.getJsonArray(RECORDS);
} catch (ClassCastException cce) {
return Future.failedFuture(new ValidationException("Property '" + RECORDS + "' must be of type JsonArray holding JsonObject objects"));
throw new ValidationException("Property '" + RECORDS + "' must be of type JsonArray holding JsonObject objects");
}
if (recordsArray == null) {
return Future.failedFuture(new ValidationException("Missing 'records' array"));
throw new ValidationException("Missing 'records' array");
}
return vertx.executeBlocking(() -> {
long beginEpchMs = currentTimeMillis();
List<KafkaProducerRecord<String, String>> kafkaProducerRecords = new ArrayList<>(recordsArray.size());
for (int i = 0; i < recordsArray.size(); i++) {
kafkaProducerRecords.add(fromRecordJsonObject(topic, recordsArray.getJsonObject(i)));
}
long durationMs = currentTimeMillis() - beginEpchMs;
log.debug("Serializing JSON did block thread for {}ms", durationMs);
return kafkaProducerRecords;
});
});
long beginEpchMs = currentTimeMillis();
List<KafkaProducerRecord<String, String>> kafkaProducerRecords = new ArrayList<>(recordsArray.size());
for (int i = 0; i < recordsArray.size(); i++) {
kafkaProducerRecords.add(fromRecordJsonObject(topic, recordsArray.getJsonObject(i)));
}
long durationMs = currentTimeMillis() - beginEpchMs;
log.debug("Serializing JSON did block thread for {}ms", durationMs);
return kafkaProducerRecords;
}));
}

/** @deprecated Use {@link #buildRecordsAsync(String, Buffer)}. */
Expand Down

0 comments on commit 400f70f

Please sign in to comment.