Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[improvement] Save resources on the BK threads by not accessing the m…
Browse files Browse the repository at this point in the history
…etrics context

(cherry picked from commit 200f21a)
  • Loading branch information
eolivelli authored and gaoran10 committed Jul 17, 2023
1 parent 1dda879 commit 6a7a9fc
Showing 1 changed file with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
Expand Down Expand Up @@ -196,22 +198,21 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ByteBuf buffer = (ByteBuf) msg;
requestStats.getNetworkTotalBytesIn().addCount(buffer.readableBytes());

OpStatsLogger requestParseLatencyStats = requestStats.getRequestParseLatencyStats();
// Update parse request latency metrics
final BiConsumer<Long, Throwable> registerRequestParseLatency = (timeBeforeParse, throwable) -> {
requestStats.getRequestParseLatencyStats().registerSuccessfulEvent(
requestParseLatencyStats.registerSuccessfulEvent(
MathUtils.elapsedNanos(timeBeforeParse), TimeUnit.NANOSECONDS);
};

// Update handle request latency metrics
final BiConsumer<ApiKeys, Long> registerRequestLatency = (apiKey, startProcessTime) -> {
requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_LATENCY)
.registerSuccessfulEvent(MathUtils.elapsedNanos(startProcessTime), TimeUnit.NANOSECONDS);
};

// If kop is enabled for authentication and the client
// has not completed the handshake authentication,
// execute channelPrepare to complete authentication
if (isActive.get() && !channelReady()) {
final BiConsumer<ApiKeys, Long> registerRequestLatency = (apiKey, startProcessTime) -> {
requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_LATENCY)
.registerSuccessfulEvent(MathUtils.elapsedNanos(startProcessTime), TimeUnit.NANOSECONDS);
};
try {
channelPrepare(ctx, buffer, registerRequestParseLatency, registerRequestLatency);
return;
Expand All @@ -235,6 +236,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
// potentially blocking until there is room in the queue for the request.
registerRequestParseLatency.accept(timeBeforeParse, null);

OpStatsLogger requestStatsLogger = requestStats.getRequestStatsLogger(kafkaHeaderAndRequest.header.apiKey(),
KopServerStats.REQUEST_LATENCY);
// Update handle request latency metrics
final Consumer<Long> registerRequestLatency = (startProcessTime) -> {
requestStatsLogger
.registerSuccessfulEvent(MathUtils.elapsedNanos(startProcessTime), TimeUnit.NANOSECONDS);
};
try {
if (log.isDebugEnabled()) {
log.debug("[{}] Received kafka cmd {}, the request content is: {}",
Expand All @@ -255,12 +263,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
return;
}

registerRequestLatency.accept(kafkaHeaderAndRequest.getHeader().apiKey(),
startProcessRequestTimestamp);

sendResponseScheduler.executeOrdered(channel.remoteAddress().hashCode(), () -> {
if (sendResponseScheduler != null) {
sendResponseScheduler.executeOrdered(channel.remoteAddress().hashCode(), () -> {
registerRequestLatency.accept(startProcessRequestTimestamp);
writeAndFlushResponseToClient(channel);
});
} else {
registerRequestLatency.accept(startProcessRequestTimestamp);
writeAndFlushResponseToClient(channel);
});
}
});
// potentially blocking until there is room in the queue for the request.
requestQueue.put(ResponseAndRequest.of(responseFuture, kafkaHeaderAndRequest));
Expand Down

0 comments on commit 6a7a9fc

Please sign in to comment.