Skip to content

Commit

Permalink
Add rest.client.max-chunk-size property
Browse files Browse the repository at this point in the history
  • Loading branch information
ejba committed Sep 18, 2023
1 parent fd18895 commit ae0f6bb
Show file tree
Hide file tree
Showing 12 changed files with 57 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ static QuarkusRestClientBuilder newBuilder() {
*/
QuarkusRestClientBuilder proxyPassword(String proxyPassword);

QuarkusRestClientBuilder maxChunkSize(int maxChunkSize);

/**
* Specifies the hosts to access without proxy.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public QuarkusRestClientBuilder proxyPassword(String proxyPassword) {
return this;
}

@Override
public QuarkusRestClientBuilder maxChunkSize(int maxChunkSize) {
proxy.maxChunkSize(maxChunkSize);
return this;
}

@Override
public QuarkusRestClientBuilder proxyUser(String proxyUser) {
proxy.proxyUser(proxyUser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class RestClientBuilderImpl implements RestClientBuilder {

private ClientLogger clientLogger;

private int maxChunkSize;

@Override
public RestClientBuilderImpl baseUrl(URL url) {
try {
Expand Down Expand Up @@ -165,6 +167,11 @@ public RestClientBuilderImpl clientLogger(ClientLogger clientLogger) {
return this;
}

public RestClientBuilderImpl maxChunkSize(int maxChunkSize) {
this.maxChunkSize = maxChunkSize;
return this;
}

@Override
public RestClientBuilderImpl executorService(ExecutorService executor) {
throw new IllegalArgumentException("Specifying executor service is not supported. " +
Expand Down Expand Up @@ -360,6 +367,10 @@ public <T> T build(Class<T> aClass) throws IllegalStateException, RestClientDefi
clientBuilder.setUserAgent(restClientsConfig.userAgent.get());
}

if (getConfiguration().hasProperty(QuarkusRestClientProperties.MAX_CHUNK_SIZE)) {
clientBuilder.maxChunkSize((Integer) getConfiguration().getProperty(QuarkusRestClientProperties.MAX_CHUNK_SIZE));
}

if (getConfiguration().hasProperty(QuarkusRestClientProperties.HTTP2)) {
clientBuilder.http2((Boolean) getConfiguration().getProperty(QuarkusRestClientProperties.HTTP2));
} else if (restClientsConfig.http2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

public class QuarkusRestClientProperties {

/**
* Configures the maximum chunk size.
*/
public static final String MAX_CHUNK_SIZE = "io.quarkus.rest.client.max-chunk-size";

/**
* Configure the connect timeout in ms.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,11 @@ public class ClientSendRequestHandler implements ClientRestHandler {
private final LoggingScope loggingScope;
private final ClientLogger clientLogger;
private final Map<Class<?>, MultipartResponseData> multipartResponseDataMap;
private final int maxChunkSize;

public ClientSendRequestHandler(boolean followRedirects, LoggingScope loggingScope, ClientLogger logger,
public ClientSendRequestHandler(int maxChunkSize, boolean followRedirects, LoggingScope loggingScope, ClientLogger logger,
Map<Class<?>, MultipartResponseData> multipartResponseDataMap) {
this.maxChunkSize = maxChunkSize;
this.followRedirects = followRedirects;
this.loggingScope = loggingScope;
this.clientLogger = logger;
Expand Down Expand Up @@ -457,7 +459,7 @@ private QuarkusMultipartFormUpload setMultipartHeadersAndPrepareBody(HttpClientR
mode = (PausableHttpPostRequestEncoder.EncoderMode) property;
}
QuarkusMultipartFormUpload multipartFormUpload = new QuarkusMultipartFormUpload(Vertx.currentContext(), multipartForm,
true, mode);
true, maxChunkSize, mode);
httpClientRequest.setChunked(multipartFormUpload.isChunked());
setEntityRelatedHeaders(headerMap, state.getEntity());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class ClientBuilderImpl extends ClientBuilder {

private LoggingScope loggingScope;
private Integer loggingBodySize = 100;
private int maxChunkSize = 8096;
private MultiQueryParamMode multiQueryParamMode;

private ClientLogger clientLogger = new DefaultClientLogger();
Expand Down Expand Up @@ -196,6 +197,11 @@ public ClientBuilder enableCompression() {
return this;
}

public ClientBuilder maxChunkSize(int maxChunkSize) {
this.maxChunkSize = maxChunkSize;
return this;
}

@Override
public ClientImpl build() {
HttpClientOptions options = Optional.ofNullable(configuration.getFromContext(HttpClientOptions.class))
Expand Down Expand Up @@ -287,6 +293,7 @@ public ClientImpl build() {

clientLogger.setBodySize(loggingBodySize);

options.setMaxChunkSize(maxChunkSize);
return new ClientImpl(options,
configuration,
CLIENT_CONTEXT_RESOLVER.resolve(Thread.currentThread().getContextClassLoader()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public Vertx get() {
});
}

handlerChain = new HandlerChain(followRedirects, loggingScope, clientContext.getMultipartResponsesData(), clientLogger);
handlerChain = new HandlerChain(options.getMaxChunkSize(), followRedirects, loggingScope,
clientContext.getMultipartResponsesData(), clientLogger);
}

public ClientContext getClientContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ class HandlerChain {

private ClientRestHandler preClientSendHandler = null;

public HandlerChain(boolean followRedirects, LoggingScope loggingScope,
public HandlerChain(int maxChunkSize, boolean followRedirects, LoggingScope loggingScope,
Map<Class<?>, MultipartResponseData> multipartData, ClientLogger clientLogger) {
this.clientCaptureCurrentContextRestHandler = new ClientCaptureCurrentContextRestHandler();
this.clientSwitchToRequestContextRestHandler = new ClientSwitchToRequestContextRestHandler();
this.clientSendHandler = new ClientSendRequestHandler(followRedirects, loggingScope, clientLogger, multipartData);
this.clientSendHandler = new ClientSendRequestHandler(maxChunkSize, followRedirects, loggingScope, clientLogger,
multipartData);
this.clientSetResponseEntityRestHandler = new ClientSetResponseEntityRestHandler();
this.clientResponseCompleteRestHandler = new ClientResponseCompleteRestHandler();
this.clientErrorHandler = new ClientErrorHandler(loggingScope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ public enum EncoderMode {
*/
private boolean isChunked;

private int maxChunkSize;

/**
* InterfaceHttpData for Body (without encoding)
*/
Expand Down Expand Up @@ -263,8 +265,8 @@ public enum EncoderMode {
* if the request is a TRACE
*/
public PausableHttpPostRequestEncoder(
HttpDataFactory factory, HttpRequest request, boolean multipart, Charset charset,
EncoderMode encoderMode)
HttpDataFactory factory, HttpRequest request, boolean multipart, int maxChunkSize,
Charset charset, EncoderMode encoderMode)
throws ErrorDataEncoderException {
this.request = checkNotNull(request, "request");
this.charset = checkNotNull(charset, "charset");
Expand All @@ -278,6 +280,7 @@ public PausableHttpPostRequestEncoder(
isLastChunk = false;
isLastChunkSent = false;
isMultipart = multipart;
this.maxChunkSize = maxChunkSize;
multipartHttpDatas = new ArrayList<>();
this.encoderMode = encoderMode;
if (isMultipart) {
Expand Down Expand Up @@ -842,7 +845,7 @@ public HttpRequest finalizeRequest() throws ErrorDataEncoderException {
iterator = multipartHttpDatas.listIterator();

headers.set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(realSize));
if (realSize > QuarkusHttpPostBodyUtil.chunkSize || isMultipart) {
if (realSize > maxChunkSize || isMultipart) {
isChunked = true;
if (transferEncoding != null) {
headers.remove(HttpHeaderNames.TRANSFER_ENCODING);
Expand Down Expand Up @@ -926,8 +929,8 @@ private String encodeAttribute(String s, Charset charset) throws ErrorDataEncode
*/
private ByteBuf fillByteBuf() {
int length = currentBuffer.readableBytes();
if (length > QuarkusHttpPostBodyUtil.chunkSize) {
return currentBuffer.readRetainedSlice(QuarkusHttpPostBodyUtil.chunkSize);
if (length > maxChunkSize) {
return currentBuffer.readRetainedSlice(maxChunkSize);
} else {
// to continue
ByteBuf slice = currentBuffer;
Expand Down Expand Up @@ -981,7 +984,7 @@ private HttpContent encodeNextChunkMultipart(int sizeleft) throws ErrorDataEncod
} else {
currentBuffer = wrappedBuffer(currentBuffer, buffer);
}
if (currentBuffer.readableBytes() < QuarkusHttpPostBodyUtil.chunkSize) {
if (currentBuffer.readableBytes() < maxChunkSize) {
currentData = null;
return null;
}
Expand Down Expand Up @@ -1018,7 +1021,7 @@ private HttpContent encodeNextChunkUrlEncoded(int sizeleft) throws ErrorDataEnco
}
// continue
size -= buffer.readableBytes() + 1;
if (currentBuffer.readableBytes() >= QuarkusHttpPostBodyUtil.chunkSize) {
if (currentBuffer.readableBytes() >= maxChunkSize) {
buffer = fillByteBuf();
return new DefaultHttpContent(buffer);
}
Expand Down Expand Up @@ -1052,7 +1055,7 @@ private HttpContent encodeNextChunkUrlEncoded(int sizeleft) throws ErrorDataEnco
currentBuffer = wrappedBuffer(currentBuffer, delimiter);
}
}
if (currentBuffer.readableBytes() >= QuarkusHttpPostBodyUtil.chunkSize) {
if (currentBuffer.readableBytes() >= maxChunkSize) {
buffer = fillByteBuf();
return new DefaultHttpContent(buffer);
}
Expand All @@ -1075,7 +1078,7 @@ private HttpContent encodeNextChunkUrlEncoded(int sizeleft) throws ErrorDataEnco
}

// end for current InterfaceHttpData, need more data
if (currentBuffer.readableBytes() < QuarkusHttpPostBodyUtil.chunkSize) {
if (currentBuffer.readableBytes() < maxChunkSize) {
currentData = null;
isKey = true;
return null;
Expand Down Expand Up @@ -1183,7 +1186,7 @@ private HttpContent nextChunk() throws ErrorDataEncoderException {
}

private int calculateRemainingSize() {
int size = QuarkusHttpPostBodyUtil.chunkSize;
int size = maxChunkSize;
if (currentBuffer != null) {
size -= currentBuffer.readableBytes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
*/
final class QuarkusHttpPostBodyUtil {

public static final int chunkSize = 8096;

/**
* Default Content-Type in binary form
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class QuarkusMultipartFormUpload implements ReadStream<Buffer>, Runnable
public QuarkusMultipartFormUpload(Context context,
QuarkusMultipartForm parts,
boolean multipart,
int maxChunkSize,
PausableHttpPostRequestEncoder.EncoderMode encoderMode) throws Exception {
this.context = context;
this.pending = new InboundBuffer<>(context)
Expand All @@ -63,7 +64,8 @@ public FileUpload createFileUpload(HttpRequest request, String name, String file
size);
}
};
this.encoder = new PausableHttpPostRequestEncoder(httpDataFactory, request, multipart, charset, encoderMode);
this.encoder = new PausableHttpPostRequestEncoder(httpDataFactory, request, multipart, maxChunkSize, charset,
encoderMode);
for (QuarkusMultipartFormDataPart formDataPart : parts) {
if (formDataPart.isAttribute()) {
encoder.addBodyAttribute(formDataPart.name(), formDataPart.value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class HandlerChainTest {
@Test
public void preSendHandlerIsAlwaysFirst() throws Exception {

var chain = new HandlerChain(true, LoggingScope.NONE, Collections.emptyMap(), new DefaultClientLogger());
var chain = new HandlerChain(8096, true, LoggingScope.NONE, Collections.emptyMap(), new DefaultClientLogger());

ClientRestHandler preHandler = ctx -> {
};
Expand Down

0 comments on commit ae0f6bb

Please sign in to comment.