-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue #10513 - fix multipart lockup with HTTP/2 #10554
Changes from 4 commits
ddd6b64
7682591
ef4ef1e
82f5f65
3a7cc0a
9d59e41
705f53b
68bb382
00510b8
7799f62
68de6af
b288b14
f400e4c
8d7523f
28c9fe9
66067fd
26e2457
7b689f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.function.Consumer; | ||
|
@@ -63,6 +64,7 @@ | |
import org.eclipse.jetty.io.WriteFlusher; | ||
import org.eclipse.jetty.util.AtomicBiInteger; | ||
import org.eclipse.jetty.util.Atomics; | ||
import org.eclipse.jetty.util.BufferUtil; | ||
import org.eclipse.jetty.util.Callback; | ||
import org.eclipse.jetty.util.CountingCallback; | ||
import org.eclipse.jetty.util.MathUtils; | ||
|
@@ -2414,6 +2416,7 @@ private class StreamData extends Stream.Data | |
private final Stream.Data data; | ||
private final HTTP2Stream stream; | ||
private final int flowControlLength; | ||
private final AtomicBoolean consumed = new AtomicBoolean(false); | ||
|
||
private StreamData(Stream.Data data, HTTP2Stream stream, int flowControlLength) | ||
{ | ||
|
@@ -2436,6 +2439,7 @@ public boolean canRetain() | |
@Override | ||
public void retain() | ||
{ | ||
System.err.println("retain " + counter + " " + BufferUtil.toDetailString(data.frame().getByteBuffer())); | ||
counter.retain(); | ||
data.retain(); | ||
} | ||
|
@@ -2444,14 +2448,30 @@ public void retain() | |
public boolean release() | ||
{ | ||
data.release(); | ||
boolean result = counter.release(); | ||
if (result) | ||
|
||
System.err.println("release " + counter + " " + BufferUtil.toDetailString(data.frame().getByteBuffer())); | ||
if (counter.release()) | ||
{ | ||
notIdle(); | ||
stream.notIdle(); | ||
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength); | ||
|
||
// once the reference count goes to zero, then it does not matter if the buffer is empty or not, as it is no | ||
// longer available to the application to read, and thus has been consumed (if not already consumed). | ||
if (consumed.compareAndSet(false, true)) | ||
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength); | ||
return true; | ||
} | ||
return result; | ||
|
||
// If all the data has been consumes and at least one release call is made, that indicates that the content | ||
// has been fully consumed. The fact that release count has not gone to zero indicates that the application wants | ||
// to retain the data, but not prevent more data being read. | ||
// Note that a release with a non-empty buffer, may indicate the release of just a part of the buffer and thus | ||
// the data has not been consumed. An optimization might be to consume the data up to the current position on each | ||
// release. | ||
if (BufferUtil.isEmpty(data.frame().getByteBuffer()) && consumed.compareAndSet(false, true)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not so sure about this one. How about the following oddity? Buffer duplicate = buffer.slice();
buffer.get(...); // read everything
duplicate.get(...); // try to read again Wouldn't that be a nasty surprise? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After discussing this issue with @sbordet, we came to an agreement that the right place to open the flow control window is in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lorban I don't understand the surprise? If a make a duplicate, then of course I can read the data again from the buffer. @sbordet @lorban Might gut reaction to the idea of opening the window in Let me try to implement.... |
||
flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength); | ||
|
||
return false; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,9 @@ | |
import java.nio.file.Paths; | ||
import java.nio.file.StandardOpenOption; | ||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Random; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.ExecutionException; | ||
|
@@ -39,6 +41,7 @@ | |
|
||
import org.eclipse.jetty.client.AsyncRequestContent; | ||
import org.eclipse.jetty.client.BufferingResponseListener; | ||
import org.eclipse.jetty.client.ByteBufferRequestContent; | ||
import org.eclipse.jetty.client.BytesRequestContent; | ||
import org.eclipse.jetty.client.ContentResponse; | ||
import org.eclipse.jetty.client.InputStreamRequestContent; | ||
|
@@ -50,6 +53,7 @@ | |
import org.eclipse.jetty.http.HttpFields; | ||
import org.eclipse.jetty.http.HttpHeader; | ||
import org.eclipse.jetty.http.HttpStatus; | ||
import org.eclipse.jetty.io.ByteBufferAccumulator; | ||
import org.eclipse.jetty.io.Content; | ||
import org.eclipse.jetty.server.Handler; | ||
import org.eclipse.jetty.server.Request; | ||
|
@@ -1193,6 +1197,98 @@ public boolean handle(Request request, org.eclipse.jetty.server.Response respons | |
assertTrue(clientLatch.await(timeoutInSeconds, TimeUnit.SECONDS)); | ||
} | ||
|
||
@ParameterizedTest | ||
@MethodSource("transports") | ||
public void testUploadWithRetainedData(Transport transport) throws Exception | ||
{ | ||
List<Content.Chunk> chunks = new ArrayList<>(); | ||
|
||
start(transport, new Handler.Abstract() | ||
{ | ||
@Override | ||
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback) | ||
{ | ||
Runnable retainer = new Runnable() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You may find |
||
{ | ||
@Override | ||
public void run() | ||
{ | ||
while (true) | ||
{ | ||
Content.Chunk chunk = request.read(); | ||
if (chunk == null) | ||
{ | ||
request.demand(this); | ||
return; | ||
} | ||
|
||
if (chunk.hasRemaining()) | ||
{ | ||
ByteBuffer byteBuffer = chunk.getByteBuffer(); | ||
if (chunk.canRetain()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a test, surely we can retain? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know? Do all the transports provide retainable chunks? |
||
{ | ||
chunk.retain(); | ||
chunks.add(Content.Chunk.asChunk(byteBuffer.slice(), chunk.isLast(), chunk)); | ||
} | ||
else | ||
{ | ||
chunks.add(Content.Chunk.from(BufferUtil.copy(byteBuffer), chunk.isLast())); | ||
} | ||
BufferUtil.clear(byteBuffer); | ||
} | ||
chunk.release(); | ||
|
||
if (Content.Chunk.isFailure(chunk)) | ||
{ | ||
callback.failed(chunk.getFailure()); | ||
return; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd move this just after the null check, as per usual idiom. |
||
|
||
if (chunk.isLast()) | ||
{ | ||
callback.succeeded(); | ||
return; | ||
} | ||
} | ||
} | ||
}; | ||
retainer.run(); | ||
return true; | ||
} | ||
}); | ||
|
||
byte[] data = new byte[16 * 1024 * 1024]; | ||
new Random().nextBytes(data); | ||
CountDownLatch latch = new CountDownLatch(1); | ||
ByteBufferRequestContent content = new ByteBufferRequestContent(ByteBuffer.wrap(data)); | ||
client.newRequest(newURI(transport)) | ||
.body(content) | ||
.send(new BufferingResponseListener(data.length) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You may use |
||
{ | ||
@Override | ||
public void onComplete(Result result) | ||
{ | ||
assertTrue(result.isSucceeded()); | ||
assertEquals(200, result.getResponse().getStatus()); | ||
latch.countDown(); | ||
} | ||
}); | ||
|
||
assertTrue(latch.await(30, TimeUnit.SECONDS)); | ||
|
||
try (ByteBufferAccumulator accumulator = new ByteBufferAccumulator()) | ||
{ | ||
for (Content.Chunk c : chunks) | ||
{ | ||
ByteBuffer byteBuffer = c.getByteBuffer(); | ||
accumulator.copyBuffer(byteBuffer); | ||
BufferUtil.clear(byteBuffer); | ||
c.release(); | ||
} | ||
assertArrayEquals(data, accumulator.toByteArray()); | ||
} | ||
} | ||
|
||
private record HandlerContext(Request request, org.eclipse.jetty.server.Response response, Callback callback) | ||
{ | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: If all the data has been consumed