Skip to content

Commit

Permalink
Fixed HTTP/2 serialization in HttpReceiverOverHTTP2.
Browse files Browse the repository at this point in the history
Fixed reset race in HTTP2Stream.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Dec 20, 2024
1 parent 8e036dd commit 630d5c5
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,6 @@ public void onComplete(Result result)
{
HttpRequest request = (HttpRequest)result.getRequest();
ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getMediaType(), getEncoding());
if (result.getResponseFailure() != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Authentication challenge failed", result.getFailure());
forwardFailureComplete(request, result.getRequestFailure(), response, result.getResponseFailure());
return;
}

String authenticationAttribute = getAuthenticationAttribute();
HttpConversation conversation = request.getConversation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ public void onComplete(Result result)
{
Request request = result.getRequest();
Response response = result.getResponse();
if (result.getResponseFailure() == null)
redirector.redirect(request, response, null);
else
redirector.fail(request, response, result.getFailure());
redirector.redirect(request, response, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ protected void responseBegin(HttpExchange exchange)
responseState = ResponseState.BEGIN;
HttpResponse response = exchange.getResponse();
HttpConversation conversation = exchange.getConversation();
// Probe the protocol handlers
// Probe the protocol handlers.
HttpClient client = getHttpDestination().getHttpClient();
ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
Response.Listener handlerListener = null;
Expand All @@ -170,7 +170,7 @@ protected void responseBegin(HttpExchange exchange)
conversation.updateResponseListeners(handlerListener);

if (LOG.isDebugEnabled())
LOG.debug("Response begin {}", response);
LOG.debug("Notifying response begin for {} on {}", exchange, this);
conversation.getResponseListeners().notifyBegin(response);
});
}
Expand All @@ -189,23 +189,23 @@ protected void responseBegin(HttpExchange exchange)
protected void responseHeader(HttpExchange exchange, HttpField field)
{
if (LOG.isDebugEnabled())
LOG.debug("Invoking responseHeader for {} on {}", field, this);
LOG.debug("Invoking responseHeader {} for {} on {}", field, exchange, this);

invoker.run(() ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseHeader on {}", this);
LOG.debug("Executing responseHeader for {} on {}", exchange, this);

if (exchange.isResponseCompleteOrTerminated())
return;

responseState = ResponseState.HEADER;
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Notifying header {}", field);
LOG.debug("Notifying response header {} for {} on {}", field, exchange, this);
boolean process = exchange.getConversation().getResponseListeners().notifyHeader(response, field);
if (LOG.isDebugEnabled())
LOG.debug("Header {} notified, {}processing needed", field, (process ? "" : "no "));
LOG.debug("Notified response header {}, processing {}", field, (process ? "needed" : "skipped"));
if (process)
{
response.addHeader(field);
Expand Down Expand Up @@ -241,12 +241,12 @@ protected void storeCookie(URI uri, HttpField field)
protected void responseHeaders(HttpExchange exchange)
{
if (LOG.isDebugEnabled())
LOG.debug("Invoking responseHeaders on {}", this);
LOG.debug("Invoking responseHeaders for {} on {}", exchange, this);

invoker.run(() ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseHeaders on {}", this);
LOG.debug("Executing responseHeaders for {} on {}", exchange, this);

if (exchange.isResponseCompleteOrTerminated())
return;
Expand Down Expand Up @@ -288,6 +288,8 @@ protected void responseHeaders(HttpExchange exchange)
}
}

if (LOG.isDebugEnabled())
LOG.debug("Notifying response headers for {} on {}", exchange, this);
ResponseListeners responseListeners = exchange.getConversation().getResponseListeners();
responseListeners.notifyHeaders(response);

Expand All @@ -298,6 +300,7 @@ protected void responseHeaders(HttpExchange exchange)
{
if (LOG.isDebugEnabled())
LOG.debug("Interim response status {}, succeeding", response.getStatus());
// TODO: explain it's queued.
responseSuccess(exchange, this::onInterim);
return;
}
Expand All @@ -311,12 +314,12 @@ protected void responseHeaders(HttpExchange exchange)
if (decoderFactory != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Decoding {} response content", decoderFactory.getEncoding());
LOG.debug("Decoding {} response content for {} on {}", decoderFactory.getEncoding(), exchange, this);
contentSource = new DecodedContentSource(decoderFactory.newDecoderContentSource(rawContentSource), response);
}

if (LOG.isDebugEnabled())
LOG.debug("Response content {} {}", response, contentSource);
LOG.debug("Notifying response content {} for {} on {}", contentSource, exchange, this);
responseListeners.notifyContentSource(response, contentSource);
});
}
Expand All @@ -327,21 +330,22 @@ protected void responseHeaders(HttpExchange exchange)
* This method takes care of ensuring the {@link Content.Source} passed to
* {@link Response.ContentSourceListener#onContentSource(Response, Content.Source)}
* calls the demand callback.
* The call to the demand callback is serialized with other events.
*/
protected void responseContentAvailable(HttpExchange exchange)
{
if (LOG.isDebugEnabled())
LOG.debug("Invoking responseContentAvailable on {}", this);
LOG.debug("Invoking responseContentAvailable for {} on {}", exchange, this);

invoker.run(() ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseContentAvailable on {}", this);
LOG.debug("Executing responseContentAvailable for {} on {}", exchange, this);

if (exchange.isResponseCompleteOrTerminated())
return;

if (LOG.isDebugEnabled())
LOG.debug("Notifying data available for {} on {}", exchange, this);
rawContentSource.onDataAvailable();
});
}
Expand All @@ -358,7 +362,7 @@ protected void responseContentAvailable(HttpExchange exchange)
protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask)
{
if (LOG.isDebugEnabled())
LOG.debug("Invoking responseSuccess on {}", this);
LOG.debug("Invoking responseSuccess for {} on {}", exchange, this);

// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
Expand All @@ -368,15 +372,15 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask)
Runnable successTask = () ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseSuccess on {}", this);
LOG.debug("Executing responseSuccess for {} on {}", exchange, this);

responseState = ResponseState.IDLE;

reset();

HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response success {}", response);
LOG.debug("Notifying response success for {} on {}", exchange, this);
exchange.getConversation().getResponseListeners().notifySuccess(response);

// Interim responses do not terminate the exchange.
Expand All @@ -403,11 +407,11 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask)
*/
protected void responseFailure(Throwable failure, Promise<Boolean> promise)
{
if (LOG.isDebugEnabled())
LOG.debug("Failing with {} on {}", failure, this);

HttpExchange exchange = getHttpExchange();

if (LOG.isDebugEnabled())
LOG.debug("Response failure {} for {} on {}", failure, exchange, this);

// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
if (exchange != null && exchange.responseComplete(failure))
Expand Down Expand Up @@ -492,7 +496,7 @@ private void cleanup()
public void abort(HttpExchange exchange, Throwable failure, Promise<Boolean> promise)
{
if (LOG.isDebugEnabled())
LOG.debug("Invoking abort with {} on {}", failure, this);
LOG.debug("Invoking abort for {} on {}", exchange, this, failure);

if (!exchange.isResponseCompleteOrTerminated())
throw new IllegalStateException();
Expand All @@ -510,13 +514,15 @@ public void abort(HttpExchange exchange, Throwable failure, Promise<Boolean> pro

responseState = ResponseState.FAILURE;
this.failure = failure;

if (contentSource != null)
contentSource.fail(failure);

dispose();

HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response abort {} {} on {}", response, exchange, getHttpChannel(), failure);
LOG.debug("Notifying response failure {} for {} on {}", failure, exchange, this);
exchange.getConversation().getResponseListeners().notifyFailure(response, failure);

// Mark atomically the response as terminated, with
Expand Down Expand Up @@ -700,10 +706,6 @@ public boolean rewind()
}
}

/**
* This Content.Source implementation guarantees that all {@link #read(boolean)} calls
* happening from a {@link #demand(Runnable)} callback must be serialized.
*/
private class ContentSource implements Content.Source, Invocable
{
private static final Logger LOG = LoggerFactory.getLogger(ContentSource.class);
Expand Down Expand Up @@ -761,9 +763,9 @@ private void onDataAvailable()
public InvocationType getInvocationType()
{
Runnable demandCallback = demandCallbackRef.get();
if (demandCallback == null)
return Invocable.getInvocationType(getHttpChannel().getConnection());
return Invocable.getInvocationType(demandCallback);
if (demandCallback != null)
return Invocable.getInvocationType(demandCallback);
return Invocable.getInvocationType(getHttpChannel().getConnection());
}

@Override
Expand All @@ -775,8 +777,6 @@ public void demand(Runnable demandCallback)
throw new IllegalArgumentException();
if (!demandCallbackRef.compareAndSet(null, demandCallback))
throw new IllegalStateException();
// The processDemand method may call HttpReceiver.read(boolean)
// so it must be called by the invoker.
invoker.run(processDemand);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,17 @@ public boolean handle(org.eclipse.jetty.server.Request request, org.eclipse.jett
return true;
}
});

client.setConnectBlocking(true);
ContentResponse response = client.GET(scenario.getScheme() + "://localhost:" + connector.getLocalPort());

assertNotNull(response);
assertEquals(200, response.getStatus());
byte[] content = response.getContent();
assertArrayEquals(data, content);
for (int i = 0; i < 2; ++i)
{
ContentResponse response = client.GET(scenario.getScheme() + "://localhost:" + connector.getLocalPort());

assertNotNull(response);
assertEquals(200, response.getStatus());
byte[] content = response.getContent();
assertArrayEquals(data, content);
}
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jetty.client.transport.HttpDestination;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.logging.StacklessLogging;
import org.eclipse.jetty.server.Handler;
Expand Down Expand Up @@ -498,20 +499,19 @@ public boolean handle(org.eclipse.jetty.server.Request request, Response respons
@Override
public void onComplete(Result result)
{
// Fake the fact that the redirect failed.
// Fake the fact that the redirect failed,
// but the redirect should still be followed.
Result newResult = new Result(result, cause);
super.onComplete(newResult);
}
});

ExecutionException e = assertThrows(ExecutionException.class, () ->
{
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/redirect")
.timeout(5, TimeUnit.SECONDS)
.send();
});
assertSame(cause, e.getCause());
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/redirect")
.timeout(5, TimeUnit.SECONDS)
.send();

assertEquals(HttpStatus.OK_200, response.getStatus());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.io.EOFException;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -43,16 +44,21 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.SerializedInvoker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.Client
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP2.class);

private final SerializedInvoker invoker;

public HttpReceiverOverHTTP2(HttpChannel channel)
{
super(channel);
Executor executor = channel.getHttpDestination().getHttpClient().getExecutor();
invoker = new SerializedInvoker(HttpReceiverOverHTTP2.class.getName(), executor);
}

@Override
Expand Down Expand Up @@ -140,7 +146,7 @@ private Runnable onResponse(Stream stream, HeadersFrame frame, Callback callback
return null;
}

return new Invocable.ReadyTask(getHttpConnection().getInvocationType(), () ->
return invoker.offer(new Invocable.ReadyTask(getHttpConnection().getInvocationType(), () ->
{
MetaData.Response response = (MetaData.Response)frame.getMetaData();
HttpResponse httpResponse = exchange.getResponse();
Expand Down Expand Up @@ -174,7 +180,7 @@ private Runnable onResponse(Stream stream, HeadersFrame frame, Callback callback
responseHeaders(exchange);

callback.succeeded();
});
}));
}

private Runnable onTrailer(HeadersFrame frame, Callback callback)
Expand Down Expand Up @@ -240,7 +246,7 @@ public Runnable onDataAvailable()
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return null;
return new Invocable.ReadyTask(getInvocationType(), () -> responseContentAvailable(exchange));
return invoker.offer(new Invocable.ReadyTask(getInvocationType(), () -> responseContentAvailable(exchange)));
}

@Override
Expand All @@ -252,7 +258,8 @@ public Runnable onReset(ResetFrame frame, Callback callback)
callback.succeeded();
return null;
}
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () ->

return invoker.offer(() ->
{
int error = frame.getError();
IOException failure = new IOException(ErrorCode.toString(error, "reset_code_" + error));
Expand All @@ -269,12 +276,12 @@ public Runnable onTimeout(TimeoutException failure, Promise<Boolean> promise)
promise.succeeded(false);
return null;
}
return () -> promise.completeWith(exchange.getRequest().abort(failure));
return invoker.offer(() -> promise.completeWith(exchange.getRequest().abort(failure)));
}

@Override
public Runnable onFailure(Throwable failure, Callback callback)
{
return () -> responseFailure(failure, Promise.from(failed -> callback.succeeded(), callback::failed));
return invoker.offer(() -> responseFailure(failure, Promise.from(failed -> callback.succeeded(), callback::failed)));
}
}
Loading

0 comments on commit 630d5c5

Please sign in to comment.