diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 29d3dbc1cdf..2f6b836dc3a 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -83,9 +83,13 @@ import java.util.Locale; import java.util.Map; import java.util.Random; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -499,8 +503,15 @@ public Runnable start(Listener listener) { outboundFlow = new OutboundFlowController(this, frameWriter); } final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch latchForExtraThread = new CountDownLatch(1); + // The transport needs up to two threads to function once started, + // but only needs one during handshaking. Start another thread during handshaking + // to make sure there's still a free thread available. If the number of threads is exhausted, + // it is better to kill the transport than for all the transports to hang unable to send. + CyclicBarrier barrier = new CyclicBarrier(2); // Connecting in the serializingExecutor, so that some stream operations like synStream // will be executed after connected. + serializingExecutor.execute(new Runnable() { @Override public void run() { @@ -510,8 +521,14 @@ public void run() { // initial preface. try { latch.await(); + barrier.await(1000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); + } catch (TimeoutException | BrokenBarrierException e) { + startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE + .withDescription("Timed out waiting for second handshake thread. " + + "The transport executor pool may have run out of threads")); + return; } // Use closed source on failure so that the reader immediately shuts down. BufferedSource source = Okio.buffer(new Source() { @@ -575,6 +592,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() return; } finally { clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true)); + latchForExtraThread.countDown(); } synchronized (lock) { socket = Preconditions.checkNotNull(sock, "socket"); @@ -584,6 +602,21 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort() } } }); + + executor.execute(new Runnable() { + @Override + public void run() { + try { + barrier.await(1000, TimeUnit.MILLISECONDS); + latchForExtraThread.await(); + } catch (BrokenBarrierException | TimeoutException e) { + // Something bad happened, maybe too few threads available! + // This will be handled in the handshake thread. + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }); // Schedule to send connection preface & settings before any other write. try { sendConnectionPrefaceAndSettings(); diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index ab7dff98444..474c5cedacc 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -247,6 +247,28 @@ public void testToString() throws Exception { assertTrue("Unexpected: " + s, s.contains(address.toString())); } + @Test + public void testTransportExecutorWithTooFewThreads() throws Exception { + ExecutorService fixedPoolExecutor = Executors.newFixedThreadPool(1); + channelBuilder.transportExecutor(fixedPoolExecutor); + InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415); + clientTransport = new OkHttpClientTransport( + channelBuilder.buildTransportFactory(), + address, + "hostname", + null, + EAG_ATTRS, + NO_PROXY, + tooManyPingsRunnable); + clientTransport.start(transportListener); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture()); + Status capturedStatus = statusCaptor.getValue(); + assertEquals("Timed out waiting for second handshake thread. " + + "The transport executor pool may have run out of threads", + capturedStatus.getDescription()); + } + /** * Test logging is functioning correctly for client received Http/2 frames. Not intended to test * actual frame content being logged.