diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 3d758d2f258..f1014cd7621 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -83,7 +83,9 @@ import java.util.Locale; import java.util.Map; import java.util.Random; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; @@ -499,31 +501,16 @@ public Runnable start(Listener listener) { outboundFlow = new OutboundFlowController(this, frameWriter); } final CountDownLatch latch = new CountDownLatch(1); - // This runs con-concurrently with handshake and works as a hack checking enough threads are - // available to start the transport. - executor.execute(new Runnable() { - @SuppressWarnings("WaitNotInLoop") - @Override - public void run() { - long waitStartTime = System.nanoTime(); - synchronized (lock) { - try { - lock.wait(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - long waitEndTime = System.nanoTime(); - if (waitEndTime - waitStartTime >= 100000000) { // never got notified - startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE - .withDescription("Timed out waiting for second handshake thread. " - + "The transport executor pool may have run out of threads")); - } - } - }); - + final CountDownLatch latchForExtraThread = new CountDownLatch(1); + // The transport needs up to two threads to function once started, + // but only needs one during handshaking. Start another thread during handshaking + // to make sure there's still a free thread available. If the number of threads is exhausted, + // it is better to kill the transport than for all the transports to hang unable to send. + CyclicBarrier barrier = new CyclicBarrier(2); // Connecting in the serializingExecutor, so that some stream operations like synStream // will be executed after connected. + + long waitStartTime = System.nanoTime(); serializingExecutor.execute(new Runnable() { @Override public void run() { @@ -532,8 +519,21 @@ public void run() { // network is not available during startup while another thread holding lock to send the // initial preface. try { + barrier.await(); + long waitEndTime = System.nanoTime(); + System.out.println("after barrier"); + if (waitEndTime - waitStartTime > 10000000) { + startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE + .withDescription("Timed out waiting for second handshake thread. " + + "The transport executor pool may have run out of threads")); + if (connectedFuture != null) { + connectedFuture.set(null); + } + System.out.println("started goaway"); + return; + } latch.await(); - } catch (InterruptedException e) { + } catch (InterruptedException | BrokenBarrierException e) { Thread.currentThread().interrupt(); } // Use closed source on failure so that the reader immediately shuts down. @@ -604,7 +604,19 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() if (sslSession != null) { securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession)); } - lock.notify(); + latchForExtraThread.countDown(); + } + } + }); + + executor.execute(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + latchForExtraThread.await(); + } catch (BrokenBarrierException | InterruptedException e) { + Thread.currentThread().interrupt(); } } }); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index f3ddf0d1dce..37f3e92f514 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -1736,6 +1736,7 @@ public void unreachableServer() throws Exception { EAG_ATTRS, NO_PROXY, tooManyPingsRunnable); + //hi ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); clientTransport.start(listener);