From dabbcf76441dd100f7a363c113f5093b32d5d36e Mon Sep 17 00:00:00 2001 From: Arturo Bernal Date: Mon, 30 Sep 2024 21:56:06 +0200 Subject: [PATCH] HTTPCLIENT-2233- Add metrics listener for IOReactor thread pool monitoring This change enhances monitoring capabilities for the IOReactor thread pool, providing better insights into performance and potential bottlenecks. --- .../impl/nio/bootstrap/H2AsyncRequester.java | 11 ++- .../bootstrap/H2MultiplexingRequester.java | 6 +- .../H2MultiplexingRequesterBootstrap.java | 16 +++- .../nio/bootstrap/H2RequesterBootstrap.java | 16 +++- .../impl/nio/bootstrap/H2ServerBootstrap.java | 11 ++- .../hc/core5/testing/nio/AsyncRequester.java | 3 +- .../hc/core5/testing/nio/AsyncServer.java | 3 +- .../nio/LoggingReactorMetricsListener.java | 68 +++++++++++++++++ .../http2/H2CompatibilityTest.java | 2 + .../nio/H2AsyncRequesterResource.java | 4 +- .../extension/nio/H2AsyncServerResource.java | 4 +- .../nio/H2MultiplexingRequesterResource.java | 4 +- .../nio/HttpAsyncRequesterResource.java | 4 +- .../nio/HttpAsyncServerResource.java | 4 +- .../core5/testing/nio/TLSIntegrationTest.java | 2 + .../http/impl/bootstrap/AsyncRequester.java | 8 +- .../bootstrap/AsyncRequesterBootstrap.java | 15 +++- .../http/impl/bootstrap/AsyncServer.java | 7 +- .../impl/bootstrap/AsyncServerBootstrap.java | 10 ++- .../impl/bootstrap/HttpAsyncRequester.java | 11 ++- .../http/impl/bootstrap/HttpAsyncServer.java | 11 ++- .../reactor/DefaultConnectingIOReactor.java | 17 ++++- .../reactor/DefaultListeningIOReactor.java | 16 +++- .../reactor/IOReactorMetricsListener.java | 71 +++++++++++++++++ .../hc/core5/reactor/IOSessionRequest.java | 14 ++++ .../hc/core5/reactor/SingleCoreIOReactor.java | 76 ++++++++++++++++++- .../hc/core5/reactor/IOWorkersTest.java | 2 +- 27 files changed, 380 insertions(+), 36 deletions(-) create mode 100644 httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingReactorMetricsListener.java create mode 100644 httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorMetricsListener.java diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java index e88a11f89c..9fe5850283 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2AsyncRequester.java @@ -44,6 +44,7 @@ import org.apache.hc.core5.pool.ManagedConnPool; import org.apache.hc.core5.reactor.IOEventHandlerFactory; import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; import org.apache.hc.core5.reactor.ProtocolIOSession; @@ -71,8 +72,9 @@ public H2AsyncRequester( final Decorator ioSessionDecorator, final Callback exceptionCallback, final IOSessionListener sessionListener, - final ManagedConnPool connPool) { - super(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool); + final ManagedConnPool connPool, + final IOReactorMetricsListener threadPoolListener) { + super(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool, threadPoolListener); this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE; } @@ -91,9 +93,10 @@ public H2AsyncRequester( final IOSessionListener sessionListener, final ManagedConnPool connPool, final TlsStrategy tlsStrategy, - final Timeout handshakeTimeout) { + final Timeout handshakeTimeout, + final IOReactorMetricsListener threadPoolListener) { super(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool, - tlsStrategy, handshakeTimeout); + tlsStrategy, handshakeTimeout, threadPoolListener); this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE; } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java index 90103e65b5..22cf0ecac6 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequester.java @@ -70,6 +70,7 @@ import org.apache.hc.core5.reactor.Command; import org.apache.hc.core5.reactor.IOEventHandlerFactory; import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; import org.apache.hc.core5.util.Args; @@ -96,9 +97,10 @@ public H2MultiplexingRequester( final Callback exceptionCallback, final IOSessionListener sessionListener, final Resolver addressResolver, - final TlsStrategy tlsStrategy) { + final TlsStrategy tlsStrategy, + final IOReactorMetricsListener threadPoolListener) { super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, - ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE); + ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener); this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy); } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java index 207bea1984..cd45a7bf77 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2MultiplexingRequesterBootstrap.java @@ -47,6 +47,7 @@ import org.apache.hc.core5.http2.nio.support.DefaultAsyncPushConsumerFactory; import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy; import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; import org.apache.hc.core5.util.Args; @@ -71,6 +72,8 @@ public class H2MultiplexingRequesterBootstrap { private IOSessionListener sessionListener; private H2StreamListener streamListener; + private IOReactorMetricsListener threadPoolListener; + private H2MultiplexingRequesterBootstrap() { this.routeEntries = new ArrayList<>(); } @@ -164,6 +167,16 @@ public final H2MultiplexingRequesterBootstrap setIOSessionListener(final IOSessi return this; } + /** + * Sets {@link IOReactorMetricsListener} instance. + * + * @return this instance. + */ + public final H2MultiplexingRequesterBootstrap setIOReactorMetricsListener(final IOReactorMetricsListener threadPoolListener) { + this.threadPoolListener = threadPoolListener; + return this; + } + /** * Sets {@link H2StreamListener} instance. * @@ -243,7 +256,8 @@ public H2MultiplexingRequester create() { exceptionCallback, sessionListener, DefaultAddressResolver.INSTANCE, - tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy()); + tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(), + threadPoolListener); } } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java index c0ea93a0d7..e460281a1d 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2RequesterBootstrap.java @@ -65,6 +65,7 @@ import org.apache.hc.core5.pool.StrictConnPool; import org.apache.hc.core5.reactor.IOEventHandlerFactory; import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; import org.apache.hc.core5.util.Args; @@ -99,6 +100,8 @@ public class H2RequesterBootstrap { private H2StreamListener streamListener; private Http1StreamListener http1StreamListener; private ConnPoolListener connPoolListener; + private IOReactorMetricsListener threadPoolListener; + private H2RequesterBootstrap() { this.routeEntries = new ArrayList<>(); @@ -249,6 +252,16 @@ public final H2RequesterBootstrap setIOSessionListener(final IOSessionListener s return this; } + /** + * Sets {@link IOReactorMetricsListener} instance. + * + * @return this instance. + */ + public final H2RequesterBootstrap setIOReactorMetricsListener(final IOReactorMetricsListener threadPoolListener) { + this.threadPoolListener = threadPoolListener; + return this; + } + /** * Sets {@link H2StreamListener} instance. * @@ -393,7 +406,8 @@ public H2AsyncRequester create() { sessionListener, connPool, actualTlsStrategy, - handshakeTimeout); + handshakeTimeout, + threadPoolListener); } } diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java index 822cbe7f4a..1400966f5d 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/bootstrap/H2ServerBootstrap.java @@ -71,6 +71,7 @@ import org.apache.hc.core5.net.URIAuthority; import org.apache.hc.core5.reactor.IOEventHandlerFactory; import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; import org.apache.hc.core5.util.Args; @@ -102,6 +103,7 @@ public class H2ServerBootstrap { private IOSessionListener sessionListener; private H2StreamListener h2StreamListener; private Http1StreamListener http1StreamListener; + private IOReactorMetricsListener threadPoolListener; private H2ServerBootstrap() { this.routeEntries = new ArrayList<>(); @@ -208,6 +210,12 @@ public final H2ServerBootstrap setIOSessionDecorator(final Decorator return this; } + + public final H2ServerBootstrap setIOReactorMetricsListener(final IOReactorMetricsListener threadPoolListener) { + this.threadPoolListener = threadPoolListener; + return this; + } + /** * Sets {@link Exception} {@link Callback} instance. * @@ -247,7 +255,6 @@ public final H2ServerBootstrap setStreamListener(final Http1StreamListener http1 this.http1StreamListener = http1StreamListener; return this; } - /** * @return this instance. * @deprecated Use {@link RequestRouter}. @@ -522,7 +529,7 @@ public HttpAsyncServer create() { handshakeTimeout); return new HttpAsyncServer(ioEventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, - sessionListener, actualCanonicalHostName); + sessionListener, actualCanonicalHostName, threadPoolListener); } } diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java index 3c229034df..2bfaea09a4 100644 --- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java +++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java @@ -65,7 +65,8 @@ DefaultConnectingIOReactor createIOReactor( LoggingIOSessionDecorator.INSTANCE, LoggingExceptionCallback.INSTANCE, LoggingIOSessionListener.INSTANCE, - sessionShutdownCallback); + sessionShutdownCallback, + LoggingReactorMetricsListener.INSTANCE); } private InetSocketAddress toSocketAddress(final HttpHost host) { diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java index 562d33e8f2..a9d23b792d 100644 --- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java +++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java @@ -60,7 +60,8 @@ DefaultListeningIOReactor createIOReactor( LoggingIOSessionDecorator.INSTANCE, LoggingExceptionCallback.INSTANCE, LoggingIOSessionListener.INSTANCE, - sessionShutdownCallback); + sessionShutdownCallback, + LoggingReactorMetricsListener.INSTANCE); } public Future listen(final InetSocketAddress address) { diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingReactorMetricsListener.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingReactorMetricsListener.java new file mode 100644 index 0000000000..2202afe9a1 --- /dev/null +++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingReactorMetricsListener.java @@ -0,0 +1,68 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.testing.nio; + +import org.apache.hc.core5.reactor.IOReactorMetricsListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LoggingReactorMetricsListener implements IOReactorMetricsListener { + + public static final IOReactorMetricsListener INSTANCE = new LoggingReactorMetricsListener(); + + private final Logger logger = LoggerFactory.getLogger("org.apache.hc.core5.http.pool"); + + @Override + public void onThreadPoolStatus(final int activeThreads, final int pendingConnections) { + if (logger.isDebugEnabled()) { + logger.debug("Active threads: {}, Pending connections: {}", activeThreads, pendingConnections); + } + } + + @Override + public void onThreadPoolSaturation(final double saturationPercentage) { + if (logger.isDebugEnabled()) { + logger.debug("Thread pool saturation: {}%", saturationPercentage); + } + } + + @Override + public void onResourceStarvationDetected() { + if (logger.isDebugEnabled()) { + logger.debug("Resource starvation detected!"); + } + } + + @Override + public void onQueueWaitTime(final long averageWaitTimeMillis) { + if (logger.isDebugEnabled()) { + logger.debug("Average queue wait time: {} ms", averageWaitTimeMillis); + } + } +} + diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/http2/H2CompatibilityTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/http2/H2CompatibilityTest.java index 41cadedc99..915b4c4b3e 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/http2/H2CompatibilityTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/compatibility/http2/H2CompatibilityTest.java @@ -68,6 +68,7 @@ import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener; import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator; import org.apache.hc.core5.testing.nio.LoggingIOSessionListener; +import org.apache.hc.core5.testing.nio.LoggingReactorMetricsListener; import org.apache.hc.core5.util.TextUtils; import org.apache.hc.core5.util.Timeout; @@ -110,6 +111,7 @@ public static void main(final String... args) throws Exception { .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE) .setExceptionCallback(LoggingExceptionCallback.INSTANCE) .setIOSessionListener(LoggingIOSessionListener.INSTANCE) + .setIOReactorMetricsListener(LoggingReactorMetricsListener.INSTANCE) .create(); } diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/H2AsyncRequesterResource.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/H2AsyncRequesterResource.java index 6ed7cfc9f0..c343de763b 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/H2AsyncRequesterResource.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/H2AsyncRequesterResource.java @@ -40,6 +40,7 @@ import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener; import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator; import org.apache.hc.core5.testing.nio.LoggingIOSessionListener; +import org.apache.hc.core5.testing.nio.LoggingReactorMetricsListener; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; @@ -69,7 +70,8 @@ public void beforeEach(final ExtensionContext extensionContext) throws Exception .setConnPoolListener(LoggingConnPoolListener.INSTANCE) .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE) .setExceptionCallback(LoggingExceptionCallback.INSTANCE) - .setIOSessionListener(LoggingIOSessionListener.INSTANCE); + .setIOSessionListener(LoggingIOSessionListener.INSTANCE) + .setIOReactorMetricsListener(LoggingReactorMetricsListener.INSTANCE); bootstrapCustomizer.accept(bootstrap); requester = bootstrap.create(); } diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/H2AsyncServerResource.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/H2AsyncServerResource.java index 188eac814f..91e4ae56e0 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/H2AsyncServerResource.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/H2AsyncServerResource.java @@ -37,6 +37,7 @@ import org.apache.hc.core5.testing.nio.LoggingExceptionCallback; import org.apache.hc.core5.testing.nio.LoggingH2StreamListener; import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener; +import org.apache.hc.core5.testing.nio.LoggingReactorMetricsListener; import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator; import org.apache.hc.core5.testing.nio.LoggingIOSessionListener; import org.junit.jupiter.api.Assertions; @@ -68,7 +69,8 @@ public void beforeEach(final ExtensionContext extensionContext) throws Exception .setStreamListener(LoggingH2StreamListener.INSTANCE) .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE) .setExceptionCallback(LoggingExceptionCallback.INSTANCE) - .setIOSessionListener(LoggingIOSessionListener.INSTANCE); + .setIOSessionListener(LoggingIOSessionListener.INSTANCE) + .setIOReactorMetricsListener(LoggingReactorMetricsListener.INSTANCE); bootstrapCustomizer.accept(bootstrap); server = bootstrap.create(); } diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/H2MultiplexingRequesterResource.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/H2MultiplexingRequesterResource.java index 1739ef893d..144d805717 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/H2MultiplexingRequesterResource.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/H2MultiplexingRequesterResource.java @@ -38,6 +38,7 @@ import org.apache.hc.core5.testing.nio.LoggingH2StreamListener; import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator; import org.apache.hc.core5.testing.nio.LoggingIOSessionListener; +import org.apache.hc.core5.testing.nio.LoggingReactorMetricsListener; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; @@ -65,7 +66,8 @@ public void beforeEach(final ExtensionContext extensionContext) throws Exception .setStreamListener(LoggingH2StreamListener.INSTANCE) .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE) .setExceptionCallback(LoggingExceptionCallback.INSTANCE) - .setIOSessionListener(LoggingIOSessionListener.INSTANCE); + .setIOSessionListener(LoggingIOSessionListener.INSTANCE) + .setIOReactorMetricsListener(LoggingReactorMetricsListener.INSTANCE); bootstrapCustomizer.accept(bootstrap); requester = bootstrap.create(); } diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/HttpAsyncRequesterResource.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/HttpAsyncRequesterResource.java index 18a5ac67f1..9eadba5565 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/HttpAsyncRequesterResource.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/HttpAsyncRequesterResource.java @@ -38,6 +38,7 @@ import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener; import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator; import org.apache.hc.core5.testing.nio.LoggingIOSessionListener; +import org.apache.hc.core5.testing.nio.LoggingReactorMetricsListener; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; @@ -67,7 +68,8 @@ public void beforeEach(final ExtensionContext extensionContext) throws Exception .setIOSessionListener(LoggingIOSessionListener.INSTANCE) .setStreamListener(LoggingHttp1StreamListener.INSTANCE_CLIENT) .setConnPoolListener(LoggingConnPoolListener.INSTANCE) - .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE); + .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE) + .setIOReactorMetricsListener(LoggingReactorMetricsListener.INSTANCE); bootstrapCustomizer.accept(bootstrap); requester = bootstrap.create(); } diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/HttpAsyncServerResource.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/HttpAsyncServerResource.java index d7ee307e4d..a522d7e76d 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/HttpAsyncServerResource.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/extension/nio/HttpAsyncServerResource.java @@ -36,6 +36,7 @@ import org.apache.hc.core5.testing.SSLTestContexts; import org.apache.hc.core5.testing.nio.LoggingExceptionCallback; import org.apache.hc.core5.testing.nio.LoggingHttp1StreamListener; +import org.apache.hc.core5.testing.nio.LoggingReactorMetricsListener; import org.apache.hc.core5.testing.nio.LoggingIOSessionDecorator; import org.apache.hc.core5.testing.nio.LoggingIOSessionListener; import org.junit.jupiter.api.Assertions; @@ -66,7 +67,8 @@ public void beforeEach(final ExtensionContext extensionContext) throws Exception .setStreamListener(LoggingHttp1StreamListener.INSTANCE_SERVER) .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE) .setExceptionCallback(LoggingExceptionCallback.INSTANCE) - .setIOSessionListener(LoggingIOSessionListener.INSTANCE); + .setIOSessionListener(LoggingIOSessionListener.INSTANCE) + .setIOReactorMetricsListener(LoggingReactorMetricsListener.INSTANCE); bootstrapCustomizer.accept(bootstrap); server = bootstrap.create(); } diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TLSIntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TLSIntegrationTest.java index d1d54687c2..e84581832a 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TLSIntegrationTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/TLSIntegrationTest.java @@ -147,6 +147,7 @@ HttpAsyncServer createServer(final TlsStrategy tlsStrategy) { .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE) .setExceptionCallback(LoggingExceptionCallback.INSTANCE) .setIOSessionListener(LoggingIOSessionListener.INSTANCE) + .setIOReactorMetricsListener(LoggingReactorMetricsListener.INSTANCE) .setRequestRouter(RequestRouter.>builder() .addRoute(RequestRouter.LOCAL_AUTHORITY, "*", () -> new EchoHandler(2048)) .resolveAuthority(RequestRouter.LOCAL_AUTHORITY_RESOLVER) @@ -165,6 +166,7 @@ HttpAsyncRequester createClient(final TlsStrategy tlsStrategy) { .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE) .setExceptionCallback(LoggingExceptionCallback.INSTANCE) .setIOSessionListener(LoggingIOSessionListener.INSTANCE) + .setIOReactorMetricsListener(LoggingReactorMetricsListener.INSTANCE) .create(); } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java index 2e91765c7e..bbac37997a 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java @@ -49,6 +49,7 @@ import org.apache.hc.core5.reactor.IOReactorConfig; import org.apache.hc.core5.reactor.IOReactorService; import org.apache.hc.core5.reactor.IOReactorStatus; +import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; import org.apache.hc.core5.util.Args; @@ -73,7 +74,9 @@ public AsyncRequester( final Callback exceptionCallback, final IOSessionListener sessionListener, final Callback sessionShutdownCallback, - final Resolver addressResolver) { + final Resolver addressResolver, + final IOReactorMetricsListener threadPoolListener + ) { this.ioReactor = new DefaultConnectingIOReactor( eventHandlerFactory, ioReactorConfig, @@ -81,7 +84,8 @@ public AsyncRequester( ioSessionDecorator, exceptionCallback, sessionListener, - sessionShutdownCallback); + sessionShutdownCallback, + threadPoolListener); this.addressResolver = addressResolver != null ? addressResolver : DefaultAddressResolver.INSTANCE; } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java index 12611bf5ab..204655b543 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequesterBootstrap.java @@ -49,6 +49,7 @@ import org.apache.hc.core5.pool.StrictConnPool; import org.apache.hc.core5.reactor.IOEventHandlerFactory; import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; import org.apache.hc.core5.util.Timeout; @@ -77,6 +78,7 @@ public class AsyncRequesterBootstrap { private IOSessionListener sessionListener; private Http1StreamListener streamListener; private ConnPoolListener connPoolListener; + private IOReactorMetricsListener threadPoolListener; private AsyncRequesterBootstrap() { } @@ -216,6 +218,16 @@ public final AsyncRequesterBootstrap setIOSessionListener(final IOSessionListene return this; } + /** + * Sets {@link IOReactorMetricsListener} instance. + * + * @return this instance. + */ + public final AsyncRequesterBootstrap setIOReactorMetricsListener(final IOReactorMetricsListener threadPoolListener) { + this.threadPoolListener = threadPoolListener; + return this; + } + /** * Sets {@link Http1StreamListener} instance. * @@ -279,7 +291,8 @@ public HttpAsyncRequester create() { sessionListener, connPool, tlsStrategyCopy, - handshakeTimeout); + handshakeTimeout, + threadPoolListener); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java index 97b241d4c0..e2712c4361 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java @@ -45,6 +45,7 @@ import org.apache.hc.core5.reactor.IOReactorConfig; import org.apache.hc.core5.reactor.IOReactorService; import org.apache.hc.core5.reactor.IOReactorStatus; +import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; import org.apache.hc.core5.reactor.ListenerEndpoint; @@ -64,7 +65,8 @@ public AsyncServer( final Decorator ioSessionDecorator, final Callback exceptionCallback, final IOSessionListener sessionListener, - final Callback sessionShutdownCallback) { + final Callback sessionShutdownCallback, + final IOReactorMetricsListener threadPoolListener) { this.ioReactor = new DefaultListeningIOReactor( eventHandlerFactory, ioReactorConfig, @@ -73,7 +75,8 @@ public AsyncServer( ioSessionDecorator, exceptionCallback, sessionListener, - sessionShutdownCallback); + sessionShutdownCallback, + threadPoolListener); } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServerBootstrap.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServerBootstrap.java index fae3749eb7..6e7694c4a1 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServerBootstrap.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServerBootstrap.java @@ -64,6 +64,7 @@ import org.apache.hc.core5.net.URIAuthority; import org.apache.hc.core5.reactor.IOEventHandlerFactory; import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; import org.apache.hc.core5.util.Args; @@ -93,6 +94,7 @@ public class AsyncServerBootstrap { private Callback exceptionCallback; private IOSessionListener sessionListener; private Http1StreamListener streamListener; + private IOReactorMetricsListener threadPoolListener; private AsyncServerBootstrap() { this.routeEntries = new ArrayList<>(); @@ -207,6 +209,12 @@ public final AsyncServerBootstrap setIOSessionDecorator(final Decorator connPool, final TlsStrategy tlsStrategy, - final Timeout handshakeTimeout) { + final Timeout handshakeTimeout, + final IOReactorMetricsListener threadPoolListener) { super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, - ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE); + ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, threadPoolListener); this.connPool = Args.notNull(connPool, "Connection pool"); this.tlsStrategy = tlsStrategy; this.handshakeTimeout = handshakeTimeout; @@ -129,9 +131,10 @@ public HttpAsyncRequester( final Decorator ioSessionDecorator, final Callback exceptionCallback, final IOSessionListener sessionListener, - final ManagedConnPool connPool) { + final ManagedConnPool connPool, + final IOReactorMetricsListener threadPoolListener) { this(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool, - null, null); + null, null, threadPoolListener); } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java index 32dde4c33a..5d4f21b7ba 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncServer.java @@ -39,6 +39,7 @@ import org.apache.hc.core5.http.nio.command.ShutdownCommand; import org.apache.hc.core5.reactor.IOEventHandlerFactory; import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.reactor.IOReactorMetricsListener; import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.reactor.IOSessionListener; import org.apache.hc.core5.reactor.ListenerEndpoint; @@ -64,9 +65,10 @@ public HttpAsyncServer( final Decorator ioSessionDecorator, final Callback exceptionCallback, final IOSessionListener sessionListener, - final String canonicalName) { + final String canonicalName, + final IOReactorMetricsListener threadPoolListener) { super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, - ShutdownCommand.GRACEFUL_NORMAL_CALLBACK); + ShutdownCommand.GRACEFUL_NORMAL_CALLBACK, threadPoolListener); this.canonicalName = canonicalName; } @@ -79,8 +81,9 @@ public HttpAsyncServer( final IOReactorConfig ioReactorConfig, final Decorator ioSessionDecorator, final Callback exceptionCallback, - final IOSessionListener sessionListener) { - this(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, null); + final IOSessionListener sessionListener, + final IOReactorMetricsListener threadPoolListener) { + this(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, null, threadPoolListener); } /** diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java index bcb1a30fd2..530029984e 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java @@ -62,7 +62,8 @@ public DefaultConnectingIOReactor( final Decorator ioSessionDecorator, final Callback exceptionCallback, final IOSessionListener sessionListener, - final Callback sessionShutdownCallback) { + final Callback sessionShutdownCallback, + final IOReactorMetricsListener threadPoolListener) { Args.notNull(eventHandlerFactory, "Event handler factory"); this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount(); this.workers = new SingleCoreIOReactor[workerCount]; @@ -74,7 +75,8 @@ public DefaultConnectingIOReactor( ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT, ioSessionDecorator, sessionListener, - sessionShutdownCallback); + sessionShutdownCallback, + threadPoolListener); this.workers[i] = dispatcher; threads[i] = (threadFactory != null ? threadFactory : THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher)); } @@ -82,6 +84,17 @@ public DefaultConnectingIOReactor( this.workerSelector = IOWorkers.newSelector(workers); } + public DefaultConnectingIOReactor( + final IOEventHandlerFactory eventHandlerFactory, + final IOReactorConfig ioReactorConfig, + final ThreadFactory threadFactory, + final Decorator ioSessionDecorator, + final Callback exceptionCallback, + final IOSessionListener sessionListener, + final Callback sessionShutdownCallback) { + this(eventHandlerFactory,ioReactorConfig, threadFactory,ioSessionDecorator, exceptionCallback, sessionListener, sessionShutdownCallback, null); + } + public DefaultConnectingIOReactor( final IOEventHandlerFactory eventHandlerFactory, final IOReactorConfig config, diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java index fe96775116..403750ee9b 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java @@ -80,6 +80,19 @@ public DefaultListeningIOReactor( final Callback exceptionCallback, final IOSessionListener sessionListener, final Callback sessionShutdownCallback) { + this(eventHandlerFactory, ioReactorConfig, dispatchThreadFactory, listenerThreadFactory, ioSessionDecorator, exceptionCallback, sessionListener, sessionShutdownCallback, null); + } + + public DefaultListeningIOReactor( + final IOEventHandlerFactory eventHandlerFactory, + final IOReactorConfig ioReactorConfig, + final ThreadFactory dispatchThreadFactory, + final ThreadFactory listenerThreadFactory, + final Decorator ioSessionDecorator, + final Callback exceptionCallback, + final IOSessionListener sessionListener, + final Callback sessionShutdownCallback, + final IOReactorMetricsListener threadPoolListener) { Args.notNull(eventHandlerFactory, "Event handler factory"); this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount(); this.workers = new SingleCoreIOReactor[workerCount]; @@ -91,7 +104,8 @@ public DefaultListeningIOReactor( ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT, ioSessionDecorator, sessionListener, - sessionShutdownCallback); + sessionShutdownCallback, + threadPoolListener); this.workers[i] = dispatcher; threads[i + 1] = (dispatchThreadFactory != null ? dispatchThreadFactory : DISPATCH_THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher)); } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorMetricsListener.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorMetricsListener.java new file mode 100644 index 0000000000..b924d68b69 --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOReactorMetricsListener.java @@ -0,0 +1,71 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.core5.reactor; + +/** + * A listener interface for receiving metrics related to the I/O reactor's + * thread pool performance and status. + * + *

The implementing class can monitor and act upon important metrics such as + * active threads, saturation levels, and resource starvation.

+ * + * @since 5.5 + */ +public interface IOReactorMetricsListener { + + /** + * Invoked to report the current status of the thread pool, including + * active threads and pending connections. + * + * @param activeThreads The number of active threads handling connections. + * @param pendingConnections The number of pending connection requests in the queue. + */ + void onThreadPoolStatus(int activeThreads, int pendingConnections); + + /** + * Invoked to report the saturation level of the thread pool as a percentage + * of the active threads to the maximum allowed connections. + * + * @param saturationPercentage The percentage indicating thread pool saturation. + */ + void onThreadPoolSaturation(double saturationPercentage); + + /** + * Invoked when the number of pending connection requests exceeds the + * maximum allowed connections, indicating possible resource starvation. + */ + void onResourceStarvationDetected(); + + /** + * Notifies about the average wait time for connection requests in the queue. + * + * @param averageWaitTimeMillis average time in milliseconds that connection requests spend in the queue. + */ + void onQueueWaitTime(long averageWaitTimeMillis); +} + diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionRequest.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionRequest.java index c808e0ac8f..57f727194f 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionRequest.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionRequest.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.concurrent.BasicFuture; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.io.CloseMode; @@ -50,6 +51,9 @@ final class IOSessionRequest implements Future { final Object attachment; final BasicFuture future; + private final long enqueueTime; + + private final AtomicReference closeableRef; public IOSessionRequest( @@ -67,6 +71,9 @@ public IOSessionRequest( this.attachment = attachment; this.future = new BasicFuture<>(callback); this.closeableRef = new AtomicReference<>(); + + // Set the time when this request is created + this.enqueueTime = System.currentTimeMillis(); } public void completed(final ProtocolIOSession ioSession) { @@ -127,4 +134,11 @@ public String toString() { ']'; } + // Getter for enqueueTime + @Internal + public long getEnqueueTime() { + return enqueueTime; + } + + } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java index 8661d67f29..8eee0aa2ab 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java @@ -42,6 +42,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.function.Callback; @@ -68,6 +70,11 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect private final AtomicBoolean shutdownInitiated; private final long selectTimeoutMillis; private volatile long lastTimeoutCheckMillis; + private final IOReactorMetricsListener threadPoolListener; + + // Atomic variables for tracking total wait time and count of processed requests + private final AtomicLong totalWaitTime = new AtomicLong(0); + private final AtomicInteger processedRequestCount = new AtomicInteger(0); SingleCoreIOReactor( final Callback exceptionCallback, @@ -75,7 +82,8 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect final IOReactorConfig reactorConfig, final Decorator ioSessionDecorator, final IOSessionListener sessionListener, - final Callback sessionShutdownCallback) { + final Callback sessionShutdownCallback, + final IOReactorMetricsListener threadPoolListener) { super(exceptionCallback); this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory"); this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config"); @@ -87,6 +95,7 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect this.channelQueue = new ConcurrentLinkedQueue<>(); this.requestQueue = new ConcurrentLinkedQueue<>(); this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds(); + this.threadPoolListener = threadPoolListener; } void enqueueChannel(final ChannelEntry entry) throws IOReactorShutdownException { @@ -136,6 +145,8 @@ void doExecute() throws IOException { processPendingConnectionRequests(); } + reportStatusToThreadPoolListener(); + // Exit select loop if graceful shutdown has been completed if (getStatus() == IOReactorStatus.SHUTTING_DOWN && this.selector.keys().isEmpty()) { break; @@ -315,6 +326,18 @@ private void validateAddress(final SocketAddress address) throws UnknownHostExce private void processPendingConnectionRequests() { IOSessionRequest sessionRequest; for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) { + + // Calculate wait time safely without keeping long-lived state + final long waitTimeMillis = System.currentTimeMillis() - sessionRequest.getEnqueueTime(); + + // Accumulate total wait time and increment count atomically + totalWaitTime.addAndGet(waitTimeMillis); + processedRequestCount.incrementAndGet(); + + if (threadPoolListener != null) { + threadPoolListener.onQueueWaitTime(waitTimeMillis); + } + if (!sessionRequest.isCancelled()) { final SocketChannel socketChannel; try { @@ -394,4 +417,55 @@ private void closePendingConnectionRequests() { } } + /** + * Reports the current status of the I/O reactor's thread pool to the + * configured metrics listener. + * + *

This method gathers three key metrics:

+ *
    + *
  • Active Threads: The number of currently active threads + * handling I/O sessions.
  • + *
  • Pending Connections: The number of connection requests + * waiting to be processed.
  • + *
  • Saturation Percentage: The ratio of active threads to the + * maximum allowed connections (defined by {@code MAX_CHANNEL_REQUESTS}), + * expressed as a percentage. It provides insight into how saturated the thread + * pool is relative to its maximum capacity. The formula for calculating saturation + * is: + *
    +     *     saturationPercentage = (activeThreads / MAX_CHANNEL_REQUESTS) * 100.0
    +     *     
  • + *
+ *

+ * If the number of pending connections exceeds {@code MAX_CHANNEL_REQUESTS}, + * resource starvation is detected, and an appropriate event is reported. + * + */ + private void reportStatusToThreadPoolListener() { + if (threadPoolListener != null) { + + // Calculate the number of active threads (connections) + final int activeThreads = (int) this.selector.keys().stream() + .filter(key -> key.isValid() && key.attachment() instanceof InternalChannel) + .count(); + + // Calculate the number of pending connection requests + final int pendingConnections = this.requestQueue.size(); + + // Calculate saturation as a percentage of active connections to max allowed connections + final double saturationPercentage = ((double) activeThreads / MAX_CHANNEL_REQUESTS) * 100.0; + + // Report thread pool status: active sessions and pending connections + threadPoolListener.onThreadPoolStatus(activeThreads, pendingConnections); + + // Report thread pool saturation + threadPoolListener.onThreadPoolSaturation(saturationPercentage); + + // Detect resource starvation if pending connections exceed threshold + if (pendingConnections > MAX_CHANNEL_REQUESTS) { + threadPoolListener.onResourceStarvationDetected(); + } + } + } + } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkersTest.java b/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkersTest.java index 1de15a4b02..c087af4320 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkersTest.java +++ b/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkersTest.java @@ -34,7 +34,7 @@ class IOWorkersTest { @Test void testIndexOverflow() { - final SingleCoreIOReactor reactor = new SingleCoreIOReactor(null, mock(IOEventHandlerFactory.class), IOReactorConfig.DEFAULT, null, null, null); + final SingleCoreIOReactor reactor = new SingleCoreIOReactor(null, mock(IOEventHandlerFactory.class), IOReactorConfig.DEFAULT, null, null, null, null); final IOWorkers.Selector selector = IOWorkers.newSelector(new SingleCoreIOReactor[]{reactor, reactor, reactor}); for (long i = Integer.MAX_VALUE - 10; i < (long) Integer.MAX_VALUE + 10; i++) { selector.next();