From 6a7a9fc2543438dc146b3ef56b7fd2b85597287e Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 11 Nov 2022 09:34:57 +0100 Subject: [PATCH] [improvement] Save resources on the BK threads by not accessing the metrics context (cherry picked from commit 200f21a8383470042ba692b51b6eb1a2767b5a0b) --- .../handlers/kop/KafkaCommandDecoder.java | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index 2157782e99..bb76f6af7f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -32,11 +32,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import java.util.function.Consumer; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.MathUtils; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -196,22 +198,21 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ByteBuf buffer = (ByteBuf) msg; requestStats.getNetworkTotalBytesIn().addCount(buffer.readableBytes()); + OpStatsLogger requestParseLatencyStats = requestStats.getRequestParseLatencyStats(); // Update parse request latency metrics final BiConsumer registerRequestParseLatency = (timeBeforeParse, throwable) -> { - requestStats.getRequestParseLatencyStats().registerSuccessfulEvent( + requestParseLatencyStats.registerSuccessfulEvent( MathUtils.elapsedNanos(timeBeforeParse), TimeUnit.NANOSECONDS); }; - // Update handle request latency metrics - final BiConsumer registerRequestLatency = (apiKey, startProcessTime) -> { - requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_LATENCY) - .registerSuccessfulEvent(MathUtils.elapsedNanos(startProcessTime), TimeUnit.NANOSECONDS); - }; - // If kop is enabled for authentication and the client // has not completed the handshake authentication, // execute channelPrepare to complete authentication if (isActive.get() && !channelReady()) { + final BiConsumer registerRequestLatency = (apiKey, startProcessTime) -> { + requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_LATENCY) + .registerSuccessfulEvent(MathUtils.elapsedNanos(startProcessTime), TimeUnit.NANOSECONDS); + }; try { channelPrepare(ctx, buffer, registerRequestParseLatency, registerRequestLatency); return; @@ -235,6 +236,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception // potentially blocking until there is room in the queue for the request. registerRequestParseLatency.accept(timeBeforeParse, null); + OpStatsLogger requestStatsLogger = requestStats.getRequestStatsLogger(kafkaHeaderAndRequest.header.apiKey(), + KopServerStats.REQUEST_LATENCY); + // Update handle request latency metrics + final Consumer registerRequestLatency = (startProcessTime) -> { + requestStatsLogger + .registerSuccessfulEvent(MathUtils.elapsedNanos(startProcessTime), TimeUnit.NANOSECONDS); + }; try { if (log.isDebugEnabled()) { log.debug("[{}] Received kafka cmd {}, the request content is: {}", @@ -255,12 +263,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception return; } - registerRequestLatency.accept(kafkaHeaderAndRequest.getHeader().apiKey(), - startProcessRequestTimestamp); - - sendResponseScheduler.executeOrdered(channel.remoteAddress().hashCode(), () -> { + if (sendResponseScheduler != null) { + sendResponseScheduler.executeOrdered(channel.remoteAddress().hashCode(), () -> { + registerRequestLatency.accept(startProcessRequestTimestamp); + writeAndFlushResponseToClient(channel); + }); + } else { + registerRequestLatency.accept(startProcessRequestTimestamp); writeAndFlushResponseToClient(channel); - }); + } }); // potentially blocking until there is room in the queue for the request. requestQueue.put(ResponseAndRequest.of(responseFuture, kafkaHeaderAndRequest));