Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect transport executors with no remaining threads #11503

Merged
merged 8 commits into from
Sep 16, 2024
24 changes: 24 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,29 @@ public Runnable start(Listener listener) {
outboundFlow = new OutboundFlowController(this, frameWriter);
}
final CountDownLatch latch = new CountDownLatch(1);
// This runs con-concurrently with handshake and works as a hack checking enough threads are
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/con-concurrently/concurrently/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't check enough threads are available to start the transport. Maybe something closer to:

The transport needs up to two threads to function once started, but only needs one during handshaking. Start another thread during handshaking to make sure there's still a free thread available. If the number of threads is exhausted, it is better to kill the transport than for all the transports to hang unable to send.

// available to start the transport.
executor.execute(new Runnable() {
@Override
public void run() {
long waitStartTime = System.nanoTime();
synchronized (lock) {
try {
lock.wait(100);
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
long waitEndTime = System.nanoTime();
System.out.println(waitEndTime - waitStartTime);
shivaspeaks marked this conversation as resolved.
Show resolved Hide resolved
if (waitEndTime - waitStartTime >= 100000000) { // never got notified
startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
.withDescription("Timed out waiting for second handshake thread. "
+ "The transport executor pool may have run out of threads"));
}
}
});

// Connecting in the serializingExecutor, so that some stream operations like synStream
// will be executed after connected.
serializingExecutor.execute(new Runnable() {
Expand Down Expand Up @@ -581,6 +604,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
if (sslSession != null) {
securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession));
}
lock.notify();
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
}
}
});
Expand Down
13 changes: 13 additions & 0 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,19 @@ public void testToString() throws Exception {
assertTrue("Unexpected: " + s, s.contains(address.toString()));
}

@Test
public void testTransportExecutorWithTooFewThreads() throws Exception {
ExecutorService fixedPoolExecutor = Executors.newFixedThreadPool(1);
channelBuilder.transportExecutor(fixedPoolExecutor);
initTransport();
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture());
Status capturedStatus = statusCaptor.getValue();
assertEquals("Timed out waiting for second handshake thread. "
+ "The transport executor pool may have run out of threads",
capturedStatus.getDescription());
}

/**
* Test logging is functioning correctly for client received Http/2 frames. Not intended to test
* actual frame content being logged.
Expand Down
Loading