From 5c7531e1ac5f51230303acc4ef63f345239d84bb Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com> Date: Tue, 25 Jun 2024 11:48:26 +0200 Subject: [PATCH] [SDCISA-16197, #590] Move buffer allocation to another thread. Related: - #590 - #583 - [QueueStatisticsCollector takes up to 10 minutes for ONE SINGLE request](https://github.com/swisspost/vertx-redisques/issues/188) - [Performance Excuses Debunked](https://m.youtube.com/watch?v=x2EOOJg8FkA) --- .../gateleen/queue/queuing/QueueProcessor.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueueProcessor.java b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueueProcessor.java index bd3f040d..e23777df 100755 --- a/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueueProcessor.java +++ b/gateleen-queue/src/main/java/org/swisspush/gateleen/queue/queuing/QueueProcessor.java @@ -24,6 +24,10 @@ import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueCircuitState; import org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueResponseType; +import static io.vertx.core.Future.failedFuture; +import static io.vertx.core.Future.succeededFuture; +import static io.vertx.core.buffer.Buffer.buffer; +import static java.lang.System.currentTimeMillis; import static org.swisspush.gateleen.core.exception.GateleenExceptionFactory.newGateleenThriftyExceptionFactory; import static org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueResponseType.FAILURE; import static org.swisspush.gateleen.queue.queuing.circuitbreaker.util.QueueResponseType.SUCCESS; @@ -290,7 +294,18 @@ private void executeQueuedRequest(Message message, Logger logger, Ht }; request1.idleTimeout(120000); // avoids blocking other requests if (queuedRequest.getPayload() != null) { - request1.send(Buffer.buffer(queuedRequest.getPayload()), httpAsyncHandler); + vertx.executeBlocking(() -> { + long beginEpchMs = currentTimeMillis(); + Buffer payload = buffer(queuedRequest.getPayload()); + long durationMs = currentTimeMillis() - beginEpchMs; + if (durationMs > 16) logger.debug("Creating buffer of size {} took {}ms", payload.length(), durationMs); + return payload; + }, false).compose((Buffer payload) -> { + request1.send(payload, httpAsyncHandler); + return succeededFuture(); + }).onFailure((Throwable ex) -> { + httpAsyncHandler.handle(failedFuture(ex)); + }); } else { request1.send(httpAsyncHandler); }