Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect transport executors with no remaining threads #11503

Merged
merged 8 commits into from
Sep 16, 2024
33 changes: 33 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,13 @@
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -499,8 +503,15 @@ public Runnable start(Listener listener) {
outboundFlow = new OutboundFlowController(this, frameWriter);
}
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch latchForExtraThread = new CountDownLatch(1);
// The transport needs up to two threads to function once started,
// but only needs one during handshaking. Start another thread during handshaking
// to make sure there's still a free thread available. If the number of threads is exhausted,
// it is better to kill the transport than for all the transports to hang unable to send.
CyclicBarrier barrier = new CyclicBarrier(2);
// Connecting in the serializingExecutor, so that some stream operations like synStream
// will be executed after connected.

serializingExecutor.execute(new Runnable() {
@Override
public void run() {
Expand All @@ -510,8 +521,14 @@ public void run() {
// initial preface.
try {
latch.await();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to understand the reason for this previous latch.
as it seems, this code waits for sendConnectionPrefaceAndSettings execution, which happens below.
Can that be moved above this serializingExecutor.execute(new Runnable() ?
Have I missed anything?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writes are performed by AsyncSink. When a thread wants to do a write, it is added to a queue and a Runnable is added to serializingExecutor. So the Runnable here is running on that "same thread" and the writes can't happen until this proceeds.

The question I had a bit earlier yesterday was, "why don't we do sendConnectionPrefaceAndSettings() in this runnable, instead of waiting on the latch?" This construction guarantees that the first things written after the TCP/TLS handshake is the HTTP/2 handshake. Back when this code was written, RPCs would be sent on transports before the transport went READY, so the sendConnectionPrefaceAndSettings() needed to be enqueued before start() returned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The introduction of DelayedClientTransport avoided using transports before they were READY. TransportSet is known as InternalSubchannel today. The activeTransportFuture.set() right after start() was the main case that sent RPCs to transports before they were ready.
cf787bd#diff-c993808318f59c8a206c5b4f1af4fd2b3f81a0fca3662be8966c392e3829e430R200

I feel like it took some years to be confident we had gotten rid of all the places that assumed a transport could support RPCs immediately after being started. But that was the most important change in that direction. The change was prompted by repeated memory leaks when dealing with ListenableFutures, as you can't remove listeners from futures. And in the process we fixed assigning RPCs too eagerly to transports.

(There had been some debate at the time whether we were okay with the added latency. Waiting for transport ready actually delays RPCs from being sent on a new connection by a RTT, because it waits to receive HTTP/2 SETTINGS from the server.)

ejona86 marked this conversation as resolved.
Show resolved Hide resolved
barrier.await(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (TimeoutException | BrokenBarrierException e) {
startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
.withDescription("Timed out waiting for second handshake thread. "
+ "The transport executor pool may have run out of threads"));
return;
}
// Use closed source on failure so that the reader immediately shuts down.
BufferedSource source = Okio.buffer(new Source() {
Expand Down Expand Up @@ -575,6 +592,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
return;
} finally {
clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
latchForExtraThread.countDown();
}
synchronized (lock) {
socket = Preconditions.checkNotNull(sock, "socket");
Expand All @@ -584,6 +602,21 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
}
}
});

executor.execute(new Runnable() {
@Override
public void run() {
try {
barrier.await(1000, TimeUnit.MILLISECONDS);
latchForExtraThread.await();
} catch (BrokenBarrierException | TimeoutException e) {
// Something bad happened, maybe too few threads available!
// This will be handled in the handshake thread.
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// Schedule to send connection preface & settings before any other write.
try {
sendConnectionPrefaceAndSettings();
Expand Down
22 changes: 22 additions & 0 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,28 @@ public void testToString() throws Exception {
assertTrue("Unexpected: " + s, s.contains(address.toString()));
}

@Test
public void testTransportExecutorWithTooFewThreads() throws Exception {
ExecutorService fixedPoolExecutor = Executors.newFixedThreadPool(1);
channelBuilder.transportExecutor(fixedPoolExecutor);
InetSocketAddress address = InetSocketAddress.createUnresolved("hostname", 31415);
clientTransport = new OkHttpClientTransport(
channelBuilder.buildTransportFactory(),
address,
"hostname",
null,
EAG_ATTRS,
NO_PROXY,
tooManyPingsRunnable);
clientTransport.start(transportListener);
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(statusCaptor.capture());
Status capturedStatus = statusCaptor.getValue();
assertEquals("Timed out waiting for second handshake thread. "
+ "The transport executor pool may have run out of threads",
capturedStatus.getDescription());
}

/**
* Test logging is functioning correctly for client received Http/2 frames. Not intended to test
* actual frame content being logged.
Expand Down