-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Detect transport executors with no remaining threads #11503
Conversation
@@ -499,6 +499,29 @@ public Runnable start(Listener listener) { | |||
outboundFlow = new OutboundFlowController(this, frameWriter); | |||
} | |||
final CountDownLatch latch = new CountDownLatch(1); | |||
// This runs con-concurrently with handshake and works as a hack checking enough threads are |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/con-concurrently/concurrently/
@@ -499,6 +499,29 @@ public Runnable start(Listener listener) { | |||
outboundFlow = new OutboundFlowController(this, frameWriter); | |||
} | |||
final CountDownLatch latch = new CountDownLatch(1); | |||
// This runs con-concurrently with handshake and works as a hack checking enough threads are |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't check enough threads are available to start the transport. Maybe something closer to:
The transport needs up to two threads to function once started, but only needs one during handshaking. Start another thread during handshaking to make sure there's still a free thread available. If the number of threads is exhausted, it is better to kill the transport than for all the transports to hang unable to send.
More changes suggested in review.
} | ||
System.out.println("started goaway"); | ||
return; | ||
} | ||
latch.await(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to understand the reason for this previous latch
.
as it seems, this code waits for sendConnectionPrefaceAndSettings
execution, which happens below.
Can that be moved above this serializingExecutor.execute(new Runnable()
?
Have I missed anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writes are performed by AsyncSink. When a thread wants to do a write, it is added to a queue and a Runnable is added to serializingExecutor
. So the Runnable here is running on that "same thread" and the writes can't happen until this proceeds.
The question I had a bit earlier yesterday was, "why don't we do sendConnectionPrefaceAndSettings() in this runnable, instead of waiting on the latch?" This construction guarantees that the first things written after the TCP/TLS handshake is the HTTP/2 handshake. Back when this code was written, RPCs would be sent on transports before the transport went READY, so the sendConnectionPrefaceAndSettings() needed to be enqueued before start() returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The introduction of DelayedClientTransport avoided using transports before they were READY. TransportSet is known as InternalSubchannel today. The activeTransportFuture.set() right after start() was the main case that sent RPCs to transports before they were ready.
cf787bd#diff-c993808318f59c8a206c5b4f1af4fd2b3f81a0fca3662be8966c392e3829e430R200
I feel like it took some years to be confident we had gotten rid of all the places that assumed a transport could support RPCs immediately after being started. But that was the most important change in that direction. The change was prompted by repeated memory leaks when dealing with ListenableFutures, as you can't remove listeners from futures. And in the process we fixed assigning RPCs too eagerly to transports.
(There had been some debate at the time whether we were okay with the added latency. Waiting for transport ready actually delays RPCs from being sent on a new connection by a RTT, because it waits to receive HTTP/2 SETTINGS from the server.)
Is another runnable actually needed here? The existing ClientFrameHandler can be started earlier, for example like this: @@ -164,7 +167,7 @@
private final ScheduledExecutorService scheduler;
private final int maxMessageSize;
private int connectionUnacknowledgedBytesRead;
- private ClientFrameHandler clientFrameHandler;
+ private final ClientFrameHandler clientFrameHandler = new ClientFrameHandler();
// Caution: Not synchronized, new value can only be safely read after the connection is complete.
private Attributes attributes;
/**
@@ -574,7 +577,7 @@
onException(e);
return;
} finally {
- clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
+ clientFrameHandler.readerAndStartSignal.add(variant.newReader(source, true));
}
synchronized (lock) {
socket = Preconditions.checkNotNull(sock, "socket");
@@ -591,15 +594,18 @@
latch.countDown();
}
+ // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it
+ // may send goAway immediately.
+ executor.execute(clientFrameHandler);
+
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
if (connectingCallback != null) {
connectingCallback.run();
}
- // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it
- // may send goAway immediately.
- executor.execute(clientFrameHandler);
+ clientFrameHandler.started.await(10, TimeUnit.SECONDS); // TODO error handling
+ clientFrameHandler.readerAndStartSignal.add("START");
synchronized (lock) {
maxConcurrentStreams = Integer.MAX_VALUE;
startPendingStreams();
@@ -1090,19 +1096,19 @@
private final OkHttpFrameLogger logger =
new OkHttpFrameLogger(Level.FINE, OkHttpClientTransport.class);
- FrameReader frameReader;
+ final CountDownLatch started = new CountDownLatch(1);
+ final BlockingQueue<Object> readerAndStartSignal = new ArrayBlockingQueue<>(2);
boolean firstSettings = true;
- ClientFrameHandler(FrameReader frameReader) {
- this.frameReader = frameReader;
- }
-
@Override
@SuppressWarnings("Finally")
public void run() {
+ started.countDown();
String threadName = Thread.currentThread().getName();
Thread.currentThread().setName("OkHttpClientTransport");
try {
+ FrameReader frameReader = (FrameReader) readerAndStartSignal.poll(1, TimeUnit.MINUTES);
+ readerAndStartSignal.poll(1, TimeUnit.MINUTES);
// Read until the underlying socket closes.
while (frameReader.nextFrame(this)) {
if (keepAliveManager != null) { |
@panchenko, it's unclear what you're optimizing for. The error handling looks harder to get right with reusing the reader. |
@ejona86 starting an intermediate runnable does not guarantee there will be an available thread for The error handling should be similar anyway. |
Oh, the concern isn't clientFrameHandler. If we can't get that thread, then at least the wedged transport is not impacting other transports. Also, we won't be able to receive the initial SETTINGS so the transport won't go READY... I guess we could have a timeout trigger in that case. The concern is the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When merging, use the "squash" setting and make sure to clean up the commit message (the default when there are multiple commits is a list of the commit messages of your commits, which is essentially never what we'd want).
Also, if you say "Fixes #11271" then merging the PR automatically closes the issue. See https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword |
Created a way to detect insufficient threads to start the transport for read and write simultaneously.
Fixes #11271