From 5ae7079beddd51861a0e6f76cc25ea0a6c089b19 Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Tue, 3 Sep 2024 10:49:38 +0000 Subject: [PATCH 1/8] Detect transport executors with no remaining threads --- .../io/grpc/okhttp/OkHttpClientTransport.java | 26 +++++++++++++++++++ .../okhttp/OkHttpClientTransportTest.java | 26 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 29d3dbc1cdf..10857de2005 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -600,6 +600,32 @@ public void run() { // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it // may send goAway immediately. executor.execute(clientFrameHandler); + + executor.execute(new Runnable() { + @Override + public void run() { + synchronized (lock) { + lock.notify(); + } + } + }); + + long waitStartTime = System.currentTimeMillis(); + synchronized (lock) { + try { + lock.wait(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + long waitEndTime = System.currentTimeMillis(); + if (waitEndTime - waitStartTime >= 1000) { // never got notified + startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE + .withDescription("Handshake timed out due to insufficient threads")); + return; + } + synchronized (lock) { maxConcurrentStreams = Integer.MAX_VALUE; startPendingStreams(); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index ab7dff98444..0648020649f 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -247,6 +247,32 @@ public void testToString() throws Exception { assertTrue("Unexpected: " + s, s.contains(address.toString())); } + @Test + public void testTransportExecutorWithTooFewThreads() throws Exception { + ExecutorService fixedPoolExecutor = Executors.newFixedThreadPool(1); + channelBuilder.transportExecutor(fixedPoolExecutor); + InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415); + clientTransport = new OkHttpClientTransport( + channelBuilder.buildTransportFactory(), + address, + "hostname", + null, + EAG_ATTRS, + NO_PROXY, + tooManyPingsRunnable); + + // Start the transport + clientTransport.start(transportListener); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture()); + + // Verify that the description of the captured Status is as expected + Status capturedStatus = statusCaptor.getValue(); + System.out.println(capturedStatus); + assertEquals("Handshake timed out due to insufficient threads", + capturedStatus.getDescription()); + } + /** * Test logging is functioning correctly for client received Http/2 frames. Not intended to test * actual frame content being logged. From 1a56f71691977613b9c1fcfb72598163d9913563 Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Mon, 9 Sep 2024 08:14:30 +0000 Subject: [PATCH 2/8] OkHttp: Detect transport executors with no remaining threads --- .../io/grpc/okhttp/OkHttpClientTransport.java | 50 +++++++++---------- .../okhttp/OkHttpClientTransportTest.java | 19 ++----- 2 files changed, 27 insertions(+), 42 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 10857de2005..17df8b69b59 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -499,6 +499,29 @@ public Runnable start(Listener listener) { outboundFlow = new OutboundFlowController(this, frameWriter); } final CountDownLatch latch = new CountDownLatch(1); + // This runs con-concurrently with handshake and works as a hack checking enough threads are + // available to start the transport. + executor.execute(new Runnable() { + @Override + public void run() { + long waitStartTime = System.nanoTime(); + synchronized (lock) { + try { + lock.wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + long waitEndTime = System.nanoTime(); + System.out.println(waitEndTime - waitStartTime); + if (waitEndTime - waitStartTime >= 100000000) { // never got notified + startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE + .withDescription("Timed out waiting for second handshake thread. " + + "The transport executor pool may have run out of threads")); + } + } + }); + // Connecting in the serializingExecutor, so that some stream operations like synStream // will be executed after connected. serializingExecutor.execute(new Runnable() { @@ -581,6 +604,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() if (sslSession != null) { securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession)); } + lock.notify(); } } }); @@ -600,32 +624,6 @@ public void run() { // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it // may send goAway immediately. executor.execute(clientFrameHandler); - - executor.execute(new Runnable() { - @Override - public void run() { - synchronized (lock) { - lock.notify(); - } - } - }); - - long waitStartTime = System.currentTimeMillis(); - synchronized (lock) { - try { - lock.wait(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - long waitEndTime = System.currentTimeMillis(); - if (waitEndTime - waitStartTime >= 1000) { // never got notified - startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE - .withDescription("Handshake timed out due to insufficient threads")); - return; - } - synchronized (lock) { maxConcurrentStreams = Integer.MAX_VALUE; startPendingStreams(); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 0648020649f..f3ddf0d1dce 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -251,25 +251,12 @@ public void testToString() throws Exception { public void testTransportExecutorWithTooFewThreads() throws Exception { ExecutorService fixedPoolExecutor = Executors.newFixedThreadPool(1); channelBuilder.transportExecutor(fixedPoolExecutor); - InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415); - clientTransport = new OkHttpClientTransport( - channelBuilder.buildTransportFactory(), - address, - "hostname", - null, - EAG_ATTRS, - NO_PROXY, - tooManyPingsRunnable); - - // Start the transport - clientTransport.start(transportListener); + initTransport(); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture()); - - // Verify that the description of the captured Status is as expected Status capturedStatus = statusCaptor.getValue(); - System.out.println(capturedStatus); - assertEquals("Handshake timed out due to insufficient threads", + assertEquals("Timed out waiting for second handshake thread. " + + "The transport executor pool may have run out of threads", capturedStatus.getDescription()); } From a6201c5546d5152caa4b50c7195a00cc54b7d038 Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Mon, 9 Sep 2024 08:47:14 +0000 Subject: [PATCH 3/8] OkHttp: Detect transport executors with no remaining threads --- okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 17df8b69b59..3d758d2f258 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -502,6 +502,7 @@ public Runnable start(Listener listener) { // This runs con-concurrently with handshake and works as a hack checking enough threads are // available to start the transport. executor.execute(new Runnable() { + @SuppressWarnings("WaitNotInLoop") @Override public void run() { long waitStartTime = System.nanoTime(); @@ -513,7 +514,6 @@ public void run() { } } long waitEndTime = System.nanoTime(); - System.out.println(waitEndTime - waitStartTime); if (waitEndTime - waitStartTime >= 100000000) { // never got notified startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE .withDescription("Timed out waiting for second handshake thread. " From 55f567c52428bfc8c2cc1516165dd75e3d46d010 Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Wed, 11 Sep 2024 06:17:04 +0000 Subject: [PATCH 4/8] Resolving blockers --- .../io/grpc/okhttp/OkHttpClientTransport.java | 62 +++++++++++-------- .../okhttp/OkHttpClientTransportTest.java | 1 + 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 3d758d2f258..f1014cd7621 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -83,7 +83,9 @@ import java.util.Locale; import java.util.Map; import java.util.Random; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; @@ -499,31 +501,16 @@ public Runnable start(Listener listener) { outboundFlow = new OutboundFlowController(this, frameWriter); } final CountDownLatch latch = new CountDownLatch(1); - // This runs con-concurrently with handshake and works as a hack checking enough threads are - // available to start the transport. - executor.execute(new Runnable() { - @SuppressWarnings("WaitNotInLoop") - @Override - public void run() { - long waitStartTime = System.nanoTime(); - synchronized (lock) { - try { - lock.wait(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - long waitEndTime = System.nanoTime(); - if (waitEndTime - waitStartTime >= 100000000) { // never got notified - startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE - .withDescription("Timed out waiting for second handshake thread. " - + "The transport executor pool may have run out of threads")); - } - } - }); - + final CountDownLatch latchForExtraThread = new CountDownLatch(1); + // The transport needs up to two threads to function once started, + // but only needs one during handshaking. Start another thread during handshaking + // to make sure there's still a free thread available. If the number of threads is exhausted, + // it is better to kill the transport than for all the transports to hang unable to send. + CyclicBarrier barrier = new CyclicBarrier(2); // Connecting in the serializingExecutor, so that some stream operations like synStream // will be executed after connected. + + long waitStartTime = System.nanoTime(); serializingExecutor.execute(new Runnable() { @Override public void run() { @@ -532,8 +519,21 @@ public void run() { // network is not available during startup while another thread holding lock to send the // initial preface. try { + barrier.await(); + long waitEndTime = System.nanoTime(); + System.out.println("after barrier"); + if (waitEndTime - waitStartTime > 10000000) { + startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE + .withDescription("Timed out waiting for second handshake thread. " + + "The transport executor pool may have run out of threads")); + if (connectedFuture != null) { + connectedFuture.set(null); + } + System.out.println("started goaway"); + return; + } latch.await(); - } catch (InterruptedException e) { + } catch (InterruptedException | BrokenBarrierException e) { Thread.currentThread().interrupt(); } // Use closed source on failure so that the reader immediately shuts down. @@ -604,7 +604,19 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() if (sslSession != null) { securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession)); } - lock.notify(); + latchForExtraThread.countDown(); + } + } + }); + + executor.execute(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + latchForExtraThread.await(); + } catch (BrokenBarrierException | InterruptedException e) { + Thread.currentThread().interrupt(); } } }); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index f3ddf0d1dce..37f3e92f514 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -1736,6 +1736,7 @@ public void unreachableServer() throws Exception { EAG_ATTRS, NO_PROXY, tooManyPingsRunnable); + //hi ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); clientTransport.start(listener); From de44623b6749b1aad8bd2c897b5da402e349aaad Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Wed, 11 Sep 2024 09:52:44 +0000 Subject: [PATCH 5/8] Resolving blockers --- cronet/build.gradle | 2 +- .../io/grpc/okhttp/OkHttpClientTransport.java | 15 +++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/cronet/build.gradle b/cronet/build.gradle index 3252a9d249b..ec5c090acc3 100644 --- a/cronet/build.gradle +++ b/cronet/build.gradle @@ -58,7 +58,7 @@ dependencies { task javadocs(type: Javadoc) { source = android.sourceSets.main.java.srcDirs - classpath += files(android.getBootClasspath()) + // classpath += files(android.getBootClasspath()) classpath += files({ android.libraryVariants.collect { variant -> variant.javaCompileProvider.get().classpath diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index f1014cd7621..2a0a7be4025 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -88,6 +88,8 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -510,7 +512,7 @@ public Runnable start(Listener listener) { // Connecting in the serializingExecutor, so that some stream operations like synStream // will be executed after connected. - long waitStartTime = System.nanoTime(); + System.out.println("before executor"); serializingExecutor.execute(new Runnable() { @Override public void run() { @@ -519,10 +521,12 @@ public void run() { // network is not available during startup while another thread holding lock to send the // initial preface. try { - barrier.await(); + System.out.println("in serializing thread"); + long waitStartTime = System.nanoTime(); + barrier.await(10000000, TimeUnit.NANOSECONDS); long waitEndTime = System.nanoTime(); System.out.println("after barrier"); - if (waitEndTime - waitStartTime > 10000000) { + if (waitEndTime - waitStartTime >= 10000000) { startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE .withDescription("Timed out waiting for second handshake thread. " + "The transport executor pool may have run out of threads")); @@ -535,6 +539,8 @@ public void run() { latch.await(); } catch (InterruptedException | BrokenBarrierException e) { Thread.currentThread().interrupt(); + } catch (TimeoutException e) { + throw new RuntimeException(e); } // Use closed source on failure so that the reader immediately shuts down. BufferedSource source = Okio.buffer(new Source() { @@ -600,11 +606,11 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true)); } synchronized (lock) { + latchForExtraThread.countDown(); socket = Preconditions.checkNotNull(sock, "socket"); if (sslSession != null) { securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession)); } - latchForExtraThread.countDown(); } } }); @@ -613,6 +619,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() @Override public void run() { try { + System.out.println("In other thread"); barrier.await(); latchForExtraThread.await(); } catch (BrokenBarrierException | InterruptedException e) { From 10fbc53a98507c6cb5accad3029c57fb38608d67 Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Wed, 11 Sep 2024 18:21:09 +0000 Subject: [PATCH 6/8] Detect transport executors with no remaining threads --- cronet/build.gradle | 2 +- .../io/grpc/okhttp/OkHttpClientTransport.java | 33 +++++++------------ .../okhttp/OkHttpClientTransportTest.java | 12 +++++-- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/cronet/build.gradle b/cronet/build.gradle index ec5c090acc3..3252a9d249b 100644 --- a/cronet/build.gradle +++ b/cronet/build.gradle @@ -58,7 +58,7 @@ dependencies { task javadocs(type: Javadoc) { source = android.sourceSets.main.java.srcDirs - // classpath += files(android.getBootClasspath()) + classpath += files(android.getBootClasspath()) classpath += files({ android.libraryVariants.collect { variant -> variant.javaCompileProvider.get().classpath diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 2a0a7be4025..75f47c15f31 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkState; import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE; import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO; +import static java.rmi.server.LogStream.log; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -512,7 +513,6 @@ public Runnable start(Listener listener) { // Connecting in the serializingExecutor, so that some stream operations like synStream // will be executed after connected. - System.out.println("before executor"); serializingExecutor.execute(new Runnable() { @Override public void run() { @@ -521,26 +521,16 @@ public void run() { // network is not available during startup while another thread holding lock to send the // initial preface. try { - System.out.println("in serializing thread"); - long waitStartTime = System.nanoTime(); - barrier.await(10000000, TimeUnit.NANOSECONDS); + barrier.await(1000, TimeUnit.MILLISECONDS); long waitEndTime = System.nanoTime(); - System.out.println("after barrier"); - if (waitEndTime - waitStartTime >= 10000000) { - startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE - .withDescription("Timed out waiting for second handshake thread. " - + "The transport executor pool may have run out of threads")); - if (connectedFuture != null) { - connectedFuture.set(null); - } - System.out.println("started goaway"); - return; - } latch.await(); - } catch (InterruptedException | BrokenBarrierException e) { + } catch (InterruptedException e) { Thread.currentThread().interrupt(); - } catch (TimeoutException e) { - throw new RuntimeException(e); + } catch (TimeoutException | BrokenBarrierException e) { + startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE + .withDescription("Timed out waiting for second handshake thread. " + + "The transport executor pool may have run out of threads")); + return; } // Use closed source on failure so that the reader immediately shuts down. BufferedSource source = Okio.buffer(new Source() { @@ -619,10 +609,11 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() @Override public void run() { try { - System.out.println("In other thread"); - barrier.await(); + barrier.await(1000, TimeUnit.MILLISECONDS); latchForExtraThread.await(); - } catch (BrokenBarrierException | InterruptedException e) { + } catch (BrokenBarrierException | TimeoutException e) { + log("Something bad happened, maybe too few threads available!"); + } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 37f3e92f514..474c5cedacc 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -251,7 +251,16 @@ public void testToString() throws Exception { public void testTransportExecutorWithTooFewThreads() throws Exception { ExecutorService fixedPoolExecutor = Executors.newFixedThreadPool(1); channelBuilder.transportExecutor(fixedPoolExecutor); - initTransport(); + InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415); + clientTransport = new OkHttpClientTransport( + channelBuilder.buildTransportFactory(), + address, + "hostname", + null, + EAG_ATTRS, + NO_PROXY, + tooManyPingsRunnable); + clientTransport.start(transportListener); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture()); Status capturedStatus = statusCaptor.getValue(); @@ -1736,7 +1745,6 @@ public void unreachableServer() throws Exception { EAG_ATTRS, NO_PROXY, tooManyPingsRunnable); - //hi ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); clientTransport.start(listener); From 2fd24b6c861a1371a8359c356d0b05500587e92f Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Fri, 13 Sep 2024 11:59:41 +0000 Subject: [PATCH 7/8] Detect transport executors with no remaining threads --- .../main/java/io/grpc/okhttp/OkHttpClientTransport.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 75f47c15f31..7e283b68602 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -19,7 +19,6 @@ import static com.google.common.base.Preconditions.checkState; import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE; import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO; -import static java.rmi.server.LogStream.log; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; @@ -522,7 +521,6 @@ public void run() { // initial preface. try { barrier.await(1000, TimeUnit.MILLISECONDS); - long waitEndTime = System.nanoTime(); latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -594,9 +592,9 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() return; } finally { clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true)); + latchForExtraThread.countDown(); } synchronized (lock) { - latchForExtraThread.countDown(); socket = Preconditions.checkNotNull(sock, "socket"); if (sslSession != null) { securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession)); @@ -612,7 +610,8 @@ public void run() { barrier.await(1000, TimeUnit.MILLISECONDS); latchForExtraThread.await(); } catch (BrokenBarrierException | TimeoutException e) { - log("Something bad happened, maybe too few threads available!"); + // Something bad happened, maybe too few threads available! + // This will be handled in the handshake thread. } catch (InterruptedException e) { Thread.currentThread().interrupt(); } From 4d44e3de3a85842d3ce2516e2c6ec7fc2727267a Mon Sep 17 00:00:00 2001 From: MV Shiva Prasad Date: Fri, 13 Sep 2024 19:03:33 +0000 Subject: [PATCH 8/8] Detect transport executors with no remaining threads --- okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 7e283b68602..2f6b836dc3a 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -520,8 +520,8 @@ public void run() { // network is not available during startup while another thread holding lock to send the // initial preface. try { - barrier.await(1000, TimeUnit.MILLISECONDS); latch.await(); + barrier.await(1000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (TimeoutException | BrokenBarrierException e) {