diff --git a/http-client-core/src/main/java/io/micronaut/http/client/RawHttpClientFactory.java b/http-client-core/src/main/java/io/micronaut/http/client/RawHttpClientFactory.java index f3f1432e29f..b31c631e169 100644 --- a/http-client-core/src/main/java/io/micronaut/http/client/RawHttpClientFactory.java +++ b/http-client-core/src/main/java/io/micronaut/http/client/RawHttpClientFactory.java @@ -38,7 +38,9 @@ public interface RawHttpClientFactory { * @return The client */ @NonNull - RawHttpClient createRawClient(@Nullable URI url); + default RawHttpClient createRawClient(@Nullable URI url) { + return createRawClient(url, new DefaultHttpClientConfiguration()); + } /** * Create a new {@link RawHttpClient} with the specified configuration. Note that this method should only be used diff --git a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/AbstractJdkHttpClient.java b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/AbstractJdkHttpClient.java index d416d7d065b..d97d7d2f50c 100644 --- a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/AbstractJdkHttpClient.java +++ b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/AbstractJdkHttpClient.java @@ -110,6 +110,25 @@ abstract class AbstractJdkHttpClient { protected MediaTypeCodecRegistry mediaTypeCodecRegistry; protected MessageBodyHandlerRegistry messageBodyHandlerRegistry; + protected AbstractJdkHttpClient(AbstractJdkHttpClient prototype) { + this.loadBalancer = prototype.loadBalancer; + this.httpVersion = prototype.httpVersion; + this.configuration = prototype.configuration; + this.contextPath = prototype.contextPath; + this.client = prototype.client; + this.cookieManager = prototype.cookieManager; + this.requestBinderRegistry = prototype.requestBinderRegistry; + this.clientId = prototype.clientId; + this.conversionService = prototype.conversionService; + this.sslBuilder = prototype.sslBuilder; + this.log = prototype.log; + this.filterResolver = prototype.filterResolver; + this.clientFilterEntries = prototype.clientFilterEntries; + this.cookieDecoder = prototype.cookieDecoder; + this.mediaTypeCodecRegistry = prototype.mediaTypeCodecRegistry; + this.messageBodyHandlerRegistry = prototype.messageBodyHandlerRegistry; + } + /** * @param log the logger to use * @param loadBalancer The {@link LoadBalancer} to use for selecting servers diff --git a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/BaseHttpResponseAdapter.java b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/BaseHttpResponseAdapter.java new file mode 100644 index 00000000000..82f2513ecb1 --- /dev/null +++ b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/BaseHttpResponseAdapter.java @@ -0,0 +1,69 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.client.jdk; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.convert.ConversionService; +import io.micronaut.core.convert.value.MutableConvertibleValues; +import io.micronaut.core.convert.value.MutableConvertibleValuesMap; +import io.micronaut.http.HttpHeaders; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; + +/** + * Base {@link HttpResponse} implementation for the JDK client, for streaming and buffered + * responses. + * + * @param The JDK body type + * @param The Micronaut HttpResponse body type + */ +@Internal +abstract class BaseHttpResponseAdapter implements HttpResponse { + final java.net.http.HttpResponse httpResponse; + final ConversionService conversionService; + final MutableConvertibleValues attributes = new MutableConvertibleValuesMap<>(); + + BaseHttpResponseAdapter(java.net.http.HttpResponse httpResponse, + ConversionService conversionService) { + this.httpResponse = httpResponse; + this.conversionService = conversionService; + } + + @Override + public HttpStatus getStatus() { + return HttpStatus.valueOf(httpResponse.statusCode()); + } + + @Override + public int code() { + return httpResponse.statusCode(); + } + + @Override + public String reason() { + return getStatus().getReason(); + } + + @Override + public HttpHeaders getHeaders() { + return new HttpHeadersAdapter(httpResponse.headers(), conversionService); + } + + @Override + public MutableConvertibleValues getAttributes() { + return attributes; + } +} diff --git a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/DefaultJdkHttpClientRegistry.java b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/DefaultJdkHttpClientRegistry.java index 4039f60b768..24c8bbd8959 100644 --- a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/DefaultJdkHttpClientRegistry.java +++ b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/DefaultJdkHttpClientRegistry.java @@ -25,6 +25,7 @@ import io.micronaut.core.annotation.AnnotationMetadata; import io.micronaut.core.annotation.Experimental; import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; import io.micronaut.core.annotation.Order; import io.micronaut.core.convert.ConversionService; @@ -44,6 +45,8 @@ import io.micronaut.http.client.HttpVersionSelection; import io.micronaut.http.client.LoadBalancer; import io.micronaut.http.client.LoadBalancerResolver; +import io.micronaut.http.client.RawHttpClient; +import io.micronaut.http.client.RawHttpClientRegistry; import io.micronaut.http.client.annotation.Client; import io.micronaut.http.client.exceptions.HttpClientException; import io.micronaut.http.client.filter.ClientFilterResolutionContext; @@ -81,7 +84,7 @@ @Order(2) // If both this and the netty client are present, netty is the default. @Internal @Experimental -public final class DefaultJdkHttpClientRegistry implements AutoCloseable, HttpClientRegistry { +public final class DefaultJdkHttpClientRegistry implements AutoCloseable, HttpClientRegistry, RawHttpClientRegistry { private static final Logger LOG = LoggerFactory.getLogger(DefaultJdkHttpClientRegistry.class); @@ -152,6 +155,28 @@ protected DefaultJdkHttpClient httpClient( return resolveDefaultHttpClient(injectionPoint, loadBalancer, configuration, beanContext); } + /** + * Creates a {@literal java.net.http.*} HTTP Client. + * + * @param injectionPoint + * @param loadBalancer + * @param configuration + * @param beanContext + * @return A {@literal java.net.http.*} HTTP Client + */ + @Bean + @BootstrapContextCompatible + @Primary + @Order(2) // If both this and the netty client are present, netty is the default. + RawHttpClient rawHttpClient( + @Nullable InjectionPoint injectionPoint, + @Parameter @Nullable LoadBalancer loadBalancer, + @Parameter @Nullable HttpClientConfiguration configuration, + BeanContext beanContext + ) { + return new JdkRawHttpClient(resolveDefaultHttpClient(injectionPoint, loadBalancer, configuration, beanContext)); + } + private DefaultJdkHttpClient resolveDefaultHttpClient( @Nullable InjectionPoint injectionPoint, @Nullable LoadBalancer loadBalancer, @@ -327,7 +352,7 @@ private DefaultJdkHttpClient buildClient( } @Override - public HttpClient getClient(HttpVersionSelection httpVersion, String clientId, String path) { + public DefaultJdkHttpClient getClient(HttpVersionSelection httpVersion, String clientId, String path) { final ClientKey key = new ClientKey( httpVersion, clientId, @@ -367,6 +392,11 @@ public void close() throws Exception { clients.clear(); } + @Override + public @NonNull RawHttpClient getRawClient(@NonNull HttpVersionSelection httpVersion, @NonNull String clientId, @Nullable String path) { + return new JdkRawHttpClient(getClient(httpVersion, clientId, path)); + } + /** * Client key. * diff --git a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/HttpRequestFactory.java b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/HttpRequestFactory.java index 048fd544658..ada5561379c 100644 --- a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/HttpRequestFactory.java +++ b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/HttpRequestFactory.java @@ -30,14 +30,19 @@ import io.micronaut.http.client.HttpClientConfiguration; import io.micronaut.http.codec.MediaTypeCodec; import io.micronaut.http.codec.MediaTypeCodecRegistry; +import reactor.adapter.JdkFlowAdapter; +import reactor.core.publisher.Flux; import java.io.ByteArrayOutputStream; import java.net.URI; import java.net.URLEncoder; import java.net.http.HttpRequest; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.Flow; import java.util.stream.Collectors; /** @@ -84,6 +89,16 @@ private static HttpRequest.BodyPublisher publisherForRequest( @Nullable MediaTypeCodecRegistry mediaTypeCodecRegistry, @NonNull MessageBodyHandlerRegistry messageBodyHandlerRegistry ) { + if (request instanceof RawHttpRequestWrapper raw) { + OptionalLong length = raw.byteBody().expectedLength(); + Flow.Publisher buffers = JdkFlowAdapter.publisherToFlowPublisher( + Flux.from(raw.byteBody().toByteArrayPublisher()).map(ByteBuffer::wrap)); + if (length.isPresent()) { + return HttpRequest.BodyPublishers.fromPublisher(buffers, length.getAsLong()); + } else { + return HttpRequest.BodyPublishers.fromPublisher(buffers); + } + } if (!HttpMethod.permitsRequestBody(request.getMethod())) { return HttpRequest.BodyPublishers.noBody(); } diff --git a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/HttpResponseAdapter.java b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/HttpResponseAdapter.java index 25b2b34e353..3f7a81b7fcc 100644 --- a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/HttpResponseAdapter.java +++ b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/HttpResponseAdapter.java @@ -21,13 +21,9 @@ import io.micronaut.core.annotation.Nullable; import io.micronaut.core.convert.ConversionContext; import io.micronaut.core.convert.ConversionService; -import io.micronaut.core.convert.value.MutableConvertibleValues; -import io.micronaut.core.convert.value.MutableConvertibleValuesMap; import io.micronaut.core.io.buffer.ByteArrayBufferFactory; import io.micronaut.core.type.Argument; -import io.micronaut.http.HttpHeaders; import io.micronaut.http.HttpResponse; -import io.micronaut.http.HttpStatus; import io.micronaut.http.MediaType; import io.micronaut.http.body.MessageBodyHandlerRegistry; import io.micronaut.http.body.MessageBodyReader; @@ -49,15 +45,12 @@ */ @Internal @Experimental -public class HttpResponseAdapter implements HttpResponse { +public class HttpResponseAdapter extends BaseHttpResponseAdapter { private static final Logger LOG = LoggerFactory.getLogger(HttpResponseAdapter.class); - private final java.net.http.HttpResponse httpResponse; @NonNull private final Argument bodyType; - private final ConversionService conversionService; - private final MutableConvertibleValues attributes = new MutableConvertibleValuesMap<>(); private final MediaTypeCodecRegistry mediaTypeCodecRegistry; private final MessageBodyHandlerRegistry messageBodyHandlerRegistry; @@ -67,38 +60,12 @@ public HttpResponseAdapter(java.net.http.HttpResponse httpResponse, ConversionService conversionService, MediaTypeCodecRegistry mediaTypeCodecRegistry, MessageBodyHandlerRegistry messageBodyHandlerRegistry) { - this.httpResponse = httpResponse; + super(httpResponse, conversionService); this.bodyType = bodyType; - this.conversionService = conversionService; this.mediaTypeCodecRegistry = mediaTypeCodecRegistry; this.messageBodyHandlerRegistry = messageBodyHandlerRegistry; } - @Override - public HttpStatus getStatus() { - return HttpStatus.valueOf(httpResponse.statusCode()); - } - - @Override - public int code() { - return httpResponse.statusCode(); - } - - @Override - public String reason() { - return getStatus().getReason(); - } - - @Override - public HttpHeaders getHeaders() { - return new HttpHeadersAdapter(httpResponse.headers(), conversionService); - } - - @Override - public MutableConvertibleValues getAttributes() { - return attributes; - } - @Override public Optional getBody() { return getBody(bodyType); diff --git a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/JdkHttpClientFactory.java b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/JdkHttpClientFactory.java index 22c51013db6..473df3cb6ab 100644 --- a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/JdkHttpClientFactory.java +++ b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/JdkHttpClientFactory.java @@ -17,6 +17,8 @@ import io.micronaut.core.annotation.Experimental; import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.annotation.Nullable; import io.micronaut.core.convert.ConversionService; import io.micronaut.core.io.buffer.ByteArrayBufferFactory; import io.micronaut.http.MediaType; @@ -25,6 +27,8 @@ import io.micronaut.http.body.WritableBodyWriter; import io.micronaut.http.client.AbstractHttpClientFactory; import io.micronaut.http.client.HttpClientConfiguration; +import io.micronaut.http.client.RawHttpClient; +import io.micronaut.http.client.RawHttpClientFactory; import io.micronaut.json.JsonMapper; import io.micronaut.json.body.JsonMessageHandler; import io.micronaut.runtime.ApplicationConfiguration; @@ -38,7 +42,7 @@ */ @Internal @Experimental -public class JdkHttpClientFactory extends AbstractHttpClientFactory { +public class JdkHttpClientFactory extends AbstractHttpClientFactory implements RawHttpClientFactory { public JdkHttpClientFactory() { super(null, createDefaultMessageBodyHandlerRegistry(), ConversionService.SHARED); @@ -66,4 +70,9 @@ public static MessageBodyHandlerRegistry createDefaultMessageBodyHandlerRegistry registry.add(MediaType.APPLICATION_JSON_STREAM_TYPE, new JsonMessageHandler<>(mapper)); return registry; } + + @Override + public @NonNull RawHttpClient createRawClient(@Nullable URI url, @NonNull HttpClientConfiguration configuration) { + return new JdkRawHttpClient(createHttpClient(url, configuration)); + } } diff --git a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/JdkRawHttpClient.java b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/JdkRawHttpClient.java new file mode 100644 index 00000000000..05706b4749b --- /dev/null +++ b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/JdkRawHttpClient.java @@ -0,0 +1,90 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.client.jdk; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.type.Argument; +import io.micronaut.http.ByteBodyHttpResponseWrapper; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.body.CloseableByteBody; +import io.micronaut.http.body.stream.BodySizeLimits; +import io.micronaut.http.client.RawHttpClient; +import io.micronaut.http.client.exceptions.HttpClientException; +import io.micronaut.http.util.HttpHeadersUtil; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.util.Optional; + +/** + * Implementation of {@link RawHttpClient} for the JDK http client. + * + * @since 4.8.0 + * @author Jonas Konrad + */ +@Internal +final class JdkRawHttpClient extends AbstractJdkHttpClient implements RawHttpClient { + public JdkRawHttpClient(AbstractJdkHttpClient prototype) { + super(prototype); + } + + @Override + public @NonNull Publisher> exchange(@NonNull HttpRequest request, @Nullable CloseableByteBody requestBody, @Nullable Thread blockedThread) { + return exchangeImpl(new RawHttpRequestWrapper(conversionService, request.toMutableRequest(), requestBody), null); + } + + @Override + public void close() throws IOException { + // Nothing to do here, we do not need to close clients + } + + @Override + protected Publisher> responsePublisher(@NonNull HttpRequest request, @Nullable Argument bodyType) { + return Mono.defer(() -> mapToHttpRequest(request, bodyType)) // defered so any client filter changes are used + .map(httpRequest -> { + if (log.isDebugEnabled()) { + log.debug("Client {} Sending HTTP Request: {}", clientId, httpRequest); + } + if (log.isTraceEnabled()) { + HttpHeadersUtil.trace(log, + () -> httpRequest.headers().map().keySet(), + headerName -> httpRequest.headers().allValues(headerName)); + } + BodySizeLimits bodySizeLimits = new BodySizeLimits(Long.MAX_VALUE, configuration.getMaxContentLength()); + return client.sendAsync(httpRequest, responseInfo -> new ReactiveByteBufferByteBody.ByteBodySubscriber(bodySizeLimits)); + }) + .flatMap(Mono::fromCompletionStage) + .onErrorMap(IOException.class, e -> new HttpClientException("Error sending request: " + e.getMessage(), e)) + .onErrorMap(InterruptedException.class, e -> new HttpClientException("Error sending request: " + e.getMessage(), e)) + .map(netResponse -> { + if (log.isDebugEnabled()) { + log.debug("Client {} Received HTTP Response: {} {}", clientId, netResponse.statusCode(), netResponse.uri()); + } + + //noinspection unchecked + return (HttpResponse) ByteBodyHttpResponseWrapper.wrap(new BaseHttpResponseAdapter(netResponse, conversionService) { + @Override + public @NonNull Optional getBody() { + return Optional.empty(); + } + }, netResponse.body()); + }); + } +} diff --git a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/RawHttpRequestWrapper.java b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/RawHttpRequestWrapper.java new file mode 100644 index 00000000000..5456d976d1f --- /dev/null +++ b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/RawHttpRequestWrapper.java @@ -0,0 +1,63 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.client.jdk; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.convert.ConversionService; +import io.micronaut.http.MutableHttpRequest; +import io.micronaut.http.MutableHttpRequestWrapper; +import io.micronaut.http.ServerHttpRequest; +import io.micronaut.http.body.ByteBody; +import io.micronaut.http.body.CloseableByteBody; +import io.micronaut.http.netty.NettyHttpRequestBuilder; +import io.netty.handler.codec.http.HttpRequest; + +import java.io.Closeable; +import java.io.IOException; + +/** + * This is a combination of a {@link HttpRequest} with a {@link ByteBody}. It implements + * {@link MutableHttpRequest} so that it can be used unchanged in the client, + * {@link NettyHttpRequestBuilder} so that the bytes are + * + * @param The body type, mostly unused + * @since 4.8.0 + */ +@Internal +final class RawHttpRequestWrapper extends MutableHttpRequestWrapper implements MutableHttpRequest, ServerHttpRequest, Closeable { + private final CloseableByteBody byteBody; + + public RawHttpRequestWrapper(ConversionService conversionService, MutableHttpRequest delegate, CloseableByteBody byteBody) { + super(conversionService, delegate); + this.byteBody = byteBody; + } + + @Override + public @NonNull ByteBody byteBody() { + return byteBody; + } + + @Override + public MutableHttpRequest body(T body) { + throw new UnsupportedOperationException("Changing the body of raw requests is currently not supported"); + } + + @Override + public void close() throws IOException { + byteBody.close(); + } +} diff --git a/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/ReactiveByteBufferByteBody.java b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/ReactiveByteBufferByteBody.java new file mode 100644 index 00000000000..4e96a1ccd4f --- /dev/null +++ b/http-client-jdk/src/main/java/io/micronaut/http/client/jdk/ReactiveByteBufferByteBody.java @@ -0,0 +1,502 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.client.jdk; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.execution.DelayedExecutionFlow; +import io.micronaut.core.execution.ExecutionFlow; +import io.micronaut.core.io.buffer.ByteArrayBufferFactory; +import io.micronaut.http.body.CloseableAvailableByteBody; +import io.micronaut.http.body.CloseableByteBody; +import io.micronaut.http.body.InternalByteBody; +import io.micronaut.http.body.stream.AvailableByteArrayBody; +import io.micronaut.http.body.stream.BaseSharedBuffer; +import io.micronaut.http.body.stream.BodySizeLimits; +import io.micronaut.http.body.stream.BufferConsumer; +import io.micronaut.http.body.stream.PublisherAsBlocking; +import io.micronaut.http.body.stream.UpstreamBalancer; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.net.http.HttpResponse; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Streaming {@link io.micronaut.http.body.ByteBody} implementation for the JDK http client. + * + * @since 4.8.0 + * @author Jonas Konrad + */ +@Internal +final class ReactiveByteBufferByteBody implements CloseableByteBody, InternalByteBody { + private final SharedBuffer sharedBuffer; + private BufferConsumer.Upstream upstream; + + ReactiveByteBufferByteBody(SharedBuffer sharedBuffer) { + this(sharedBuffer, sharedBuffer.getRootUpstream()); + } + + private ReactiveByteBufferByteBody(SharedBuffer sharedBuffer, BufferConsumer.Upstream upstream) { + this.sharedBuffer = sharedBuffer; + this.upstream = upstream; + } + + private BufferConsumer.Upstream primary(ByteBufferConsumer primary) { + BufferConsumer.Upstream upstream = this.upstream; + if (upstream == null) { + BaseSharedBuffer.failClaim(); + } + this.upstream = null; + BaseSharedBuffer.logClaim(); + sharedBuffer.subscribe(primary, upstream); + return upstream; + } + + @Override + public @NonNull CloseableByteBody split(@NonNull SplitBackpressureMode backpressureMode) { + BufferConsumer.Upstream upstream = this.upstream; + if (upstream == null) { + BaseSharedBuffer.failClaim(); + } + UpstreamBalancer.UpstreamPair pair = UpstreamBalancer.balancer(upstream, backpressureMode); + this.upstream = pair.left(); + this.sharedBuffer.reserve(); + return new ReactiveByteBufferByteBody(sharedBuffer, pair.right()); + } + + @Override + public @NonNull OptionalLong expectedLength() { + return sharedBuffer.getExpectedLength(); + } + + private Flux toNioBufferPublisher() { + AsFlux asFlux = new AsFlux(sharedBuffer); + BufferConsumer.Upstream upstream = primary(asFlux); + return asFlux.asFlux(upstream); + } + + @Override + public @NonNull InputStream toInputStream() { + PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking<>(); + toNioBufferPublisher().subscribe(publisherAsBlocking); + return new InputStream() { + private ByteBuffer buffer; + + @Override + public int read() throws IOException { + byte[] arr = new byte[1]; + int n = read(arr); + return n == -1 ? -1 : arr[0] & 0xff; + } + + @Override + public int read(byte @NonNull [] b, int off, int len) throws IOException { + while (buffer == null) { + try { + ByteBuffer o = publisherAsBlocking.take(); + if (o == null) { + Throwable failure = publisherAsBlocking.getFailure(); + if (failure == null) { + return -1; + } else { + throw new IOException(failure); + } + } + if (!o.hasRemaining()) { + continue; + } + buffer = o; + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + + int toRead = Math.min(len, buffer.remaining()); + buffer.get(b, off, toRead); + if (buffer.remaining() == 0) { + buffer = null; + } + return toRead; + } + + @Override + public void close() { + publisherAsBlocking.close(); + } + }; + } + + @Override + public @NonNull Flux toByteArrayPublisher() { + return toNioBufferPublisher().map(ReactiveByteBufferByteBody::toByteArray); + } + + private static byte @NonNull [] toByteArray(ByteBuffer bb) { + byte[] bytes = new byte[bb.remaining()]; + bb.get(bytes); + return bytes; + } + + @Override + public @NonNull Publisher> toByteBufferPublisher() { + return toByteArrayPublisher().map(ByteArrayBufferFactory.INSTANCE::wrap); + } + + @Override + public @NonNull ExecutionFlow bufferFlow() { + BufferConsumer.Upstream upstream = this.upstream; + if (upstream == null) { + BaseSharedBuffer.failClaim(); + } + this.upstream = null; + BaseSharedBuffer.logClaim(); + upstream.start(); + upstream.onBytesConsumed(Long.MAX_VALUE); + return sharedBuffer.subscribeFull(upstream) + .map(bb -> AvailableByteArrayBody.create(ByteArrayBufferFactory.INSTANCE, ReactiveByteBufferByteBody.toByteArray(bb))); + } + + @Override + public void close() { + BufferConsumer.Upstream upstream = this.upstream; + if (upstream == null) { + return; + } + this.upstream = null; + BaseSharedBuffer.logClaim(); + upstream.allowDiscard(); + upstream.disregardBackpressure(); + upstream.start(); + sharedBuffer.subscribe(null, upstream); + } + + @Override + public @NonNull CloseableByteBody allowDiscard() { + BufferConsumer.Upstream upstream = this.upstream; + if (upstream == null) { + BaseSharedBuffer.failClaim(); + } + upstream.allowDiscard(); + return this; + } + + interface ByteBufferConsumer extends BufferConsumer { + void add(@NonNull ByteBuffer buffer); + } + + private static final class AsFlux extends BaseSharedBuffer.AsFlux implements ByteBufferConsumer { + AsFlux(BaseSharedBuffer sharedBuffer) { + super(sharedBuffer); + } + + @Override + protected int size(ByteBuffer buf) { + return buf.remaining(); + } + + @Override + public void add(ByteBuffer buffer) { + add0(buffer); + } + } + + /** + * Simple implementation of {@link BaseSharedBuffer} that consumes {@link ByteBuffer}s.
+ * Buffering is done using a {@link ByteArrayOutputStream}. Concurrency control is done through + * a non-reentrant lock based on {@link AtomicReference}. + */ + static final class SharedBuffer extends BaseSharedBuffer implements ByteBufferConsumer { + // fields for concurrency control, see #submit + private final AtomicReference workState = new AtomicReference<>(WorkState.CLEAN); + private final ConcurrentLinkedQueue workQueue = new ConcurrentLinkedQueue<>(); + + private SnapshotByteArrayOutputStream buffer; + private ByteBuffer adding; + + public SharedBuffer(BodySizeLimits limits, Upstream rootUpstream) { + super(limits, rootUpstream); + } + + /** + * Run a task non-concurrently with other submitted tasks. This method fulfills multiple + * constraints:
+ *
    + *
  • It does not block (like a simple lock would) when another thread is already + * working. Instead, the submitted task will be run at a later time on the other + * thread.
  • + *
  • Tasks submitted on one thread will not be reordered (local order). This is + * similar to {@code EventLoopFlow} semantics.
  • + *
  • Reentrant calls (calls to {@code submit} from inside a submitted task) will + * delay the second task until the first task is complete.
  • + *
  • There is no executor to run tasks. This ensures good locality when submissions + * have low contention (i.e. tasks are usually run immediately on the submitting + * thread).
  • + *
+ * + * @param task The task to run + */ + private void submit(Runnable task) { + /* + * This implementation is fairly simple: A work queue that contains all the tasks, and + * an atomic field to control concurrency. + */ + + workQueue.add(task); + if (workState.getAndUpdate(ws -> ws == WorkState.CLEAN ? WorkState.WORKING_THEN_CLEAN : WorkState.WORKING_THEN_DIRTY) != WorkState.CLEAN) { + // another thread is working and will pick up our task + return; + } + + // it's our turn + while (true) { + while (true) { + task = workQueue.poll(); + if (task == null) { + break; + } + task.run(); + } + + if (workState.compareAndSet(WorkState.WORKING_THEN_CLEAN, WorkState.CLEAN)) { + // done! + break; + } + + // some other thread added a task in the meantime, run it. + workState.set(WorkState.WORKING_THEN_CLEAN); + } + } + + public void reserve() { + submit(this::reserve0); + } + + public void subscribe(@Nullable ByteBufferConsumer consumer, Upstream upstream) { + submit(() -> subscribe0(consumer, upstream)); + } + + public DelayedExecutionFlow subscribeFull(Upstream specificUpstream) { + DelayedExecutionFlow flow = DelayedExecutionFlow.create(); + submit(() -> subscribeFull0(flow, specificUpstream, false)); + return flow; + } + + @Override + protected void forwardInitialBuffer(@Nullable ByteBufferConsumer subscriber, boolean last) { + if (buffer != null) { + if (subscriber != null) { + subscriber.add(buffer.snapshot()); + } + if (last) { + buffer = null; + } + } + } + + @Override + protected ByteBuffer subscribeFullResult(boolean last) { + if (buffer == null) { + return ByteBuffer.allocate(0); + } else { + ByteBuffer snapshot = buffer.snapshot(); + if (last) { + buffer = null; + } + return snapshot; + } + } + + @Override + protected void addForward(List consumers) { + for (ByteBufferConsumer consumer : consumers) { + consumer.add(adding.asReadOnlyBuffer()); // we want independent positions + } + } + + @Override + protected void addBuffer() { + if (buffer == null) { + buffer = new SnapshotByteArrayOutputStream(); + } + buffer.write(adding); + } + + @Override + protected void discardBuffer() { + buffer = null; + } + + @Override + public void add(ByteBuffer buffer) { + submit(() -> { + adding = buffer; + add(buffer.remaining()); + adding = null; + }); + } + + @Override + public void error(Throwable e) { + submit(() -> super.error(e)); + } + + @Override + public void complete() { + submit(super::complete); + } + } + + /** + * {@link ByteArrayOutputStream} implementations that allows taking an efficient snapshot of + * the current data. + */ + private static final class SnapshotByteArrayOutputStream extends ByteArrayOutputStream { + public ByteBuffer snapshot() { + return ByteBuffer.wrap(buf, 0, count).asReadOnlyBuffer(); + } + + public void write(ByteBuffer buffer) { + if (buffer.hasArray()) { + write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + } else { + byte[] b = new byte[buffer.remaining()]; + buffer.get(buffer.position(), b); + write(b, 0, b.length); + } + } + } + + private enum WorkState { + CLEAN, + WORKING_THEN_CLEAN, + WORKING_THEN_DIRTY + } + + /** + * {@link HttpResponse.BodySubscriber} implementation that pushes data into a + * {@link SharedBuffer}. + */ + static final class ByteBodySubscriber implements HttpResponse.BodySubscriber, BufferConsumer.Upstream { + private final SharedBuffer sharedBuffer; + private final CloseableByteBody root; + private final AtomicLong demand = new AtomicLong(0); + private Flow.Subscription subscription; + private boolean cancelled; + private volatile boolean disregardBackpressure; + + ByteBodySubscriber(BodySizeLimits limits) { + sharedBuffer = new SharedBuffer(limits, this); + root = new ReactiveByteBufferByteBody(sharedBuffer); + } + + @Override + public CompletionStage getBody() { + return CompletableFuture.completedFuture(root); + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + boolean initialDemand; + boolean cancelled; + synchronized (this) { + this.subscription = subscription; + cancelled = this.cancelled; + initialDemand = demand.get() > 0; + } + if (cancelled) { + subscription.cancel(); + } else if (initialDemand) { + subscription.request(disregardBackpressure ? Long.MAX_VALUE : 1); + } + } + + @Override + public void onNext(List item) { + for (ByteBuffer buffer : item) { + int n = buffer.remaining(); + demand.addAndGet(-n); + sharedBuffer.add(buffer); + } + if (demand.get() > 0) { + subscription.request(1); + } + } + + @Override + public void onError(Throwable throwable) { + sharedBuffer.error(throwable); + } + + @Override + public void onComplete() { + sharedBuffer.complete(); + } + + @Override + public void start() { + Flow.Subscription initialDemand; + synchronized (this) { + initialDemand = subscription; + demand.set(1); + } + if (initialDemand != null) { + initialDemand.request(1); + } + } + + @Override + public void onBytesConsumed(long bytesConsumed) { + long prev = demand.getAndAdd(bytesConsumed); + if (prev <= 0 && prev + bytesConsumed > 0) { + subscription.request(1); + } + } + + @Override + public void allowDiscard() { + Flow.Subscription subscription; + synchronized (this) { + cancelled = true; + subscription = this.subscription; + } + if (subscription != null) { + subscription.cancel(); + } + } + + @Override + public void disregardBackpressure() { + disregardBackpressure = true; + if (subscription != null) { + subscription.request(Long.MAX_VALUE); + } + } + } +} diff --git a/http-client-jdk/src/main/resources/META-INF/services/io.micronaut.http.client.RawHttpClientFactory b/http-client-jdk/src/main/resources/META-INF/services/io.micronaut.http.client.RawHttpClientFactory new file mode 100644 index 00000000000..b0dd611e313 --- /dev/null +++ b/http-client-jdk/src/main/resources/META-INF/services/io.micronaut.http.client.RawHttpClientFactory @@ -0,0 +1 @@ +io.micronaut.http.client.jdk.JdkHttpClientFactory diff --git a/http-client-tck/src/main/java/io/micronaut/http/client/tck/tests/RawTest.java b/http-client-tck/src/main/java/io/micronaut/http/client/tck/tests/RawTest.java index 2985c78ab93..4f2ad67488b 100644 --- a/http-client-tck/src/main/java/io/micronaut/http/client/tck/tests/RawTest.java +++ b/http-client-tck/src/main/java/io/micronaut/http/client/tck/tests/RawTest.java @@ -74,6 +74,23 @@ public void getLong() throws Exception { } } + @Test + public void getLongInputStream() throws Exception { + try (ServerUnderTest server = ServerUnderTestProviderUtils.getServerUnderTestProvider().getServer(SPEC_NAME); + RawHttpClient client = server.getApplicationContext().createBean(RawHttpClient.class); + ByteBodyHttpResponse response = Mono.from(client.exchange(HttpRequest.GET(server.getURL().get() + "/raw/get-long"), null, null)) + .cast(ByteBodyHttpResponse.class) + .block()) { + + try (InputStream is = response.byteBody().toInputStream()) { + Assertions.assertArrayEquals( + LONG_PAYLOAD, + is.readAllBytes() + ); + } + } + } + @Test public void echoLong() throws Exception { try (ServerUnderTest server = ServerUnderTestProviderUtils.getServerUnderTestProvider().getServer(SPEC_NAME); diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java b/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java index d5e1ec83b0f..274a01739fd 100644 --- a/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java +++ b/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java @@ -45,6 +45,7 @@ import io.micronaut.http.MediaType; import io.micronaut.http.MutableHttpHeaders; import io.micronaut.http.MutableHttpRequest; +import io.micronaut.http.MutableHttpRequestWrapper; import io.micronaut.http.MutableHttpResponse; import io.micronaut.http.bind.DefaultRequestBinderRegistry; import io.micronaut.http.bind.RequestBinderRegistry; @@ -92,7 +93,7 @@ import io.micronaut.http.netty.NettyHttpRequestBuilder; import io.micronaut.http.netty.NettyHttpResponseBuilder; import io.micronaut.http.netty.body.AvailableNettyByteBody; -import io.micronaut.http.netty.body.BodySizeLimits; +import io.micronaut.http.body.stream.BodySizeLimits; import io.micronaut.http.netty.body.NettyBodyAdapter; import io.micronaut.http.netty.body.NettyByteBody; import io.micronaut.http.netty.body.NettyByteBufMessageBodyHandler; @@ -124,7 +125,6 @@ import io.netty.buffer.ByteBufHolder; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.EmptyByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; @@ -1822,7 +1822,7 @@ private NettyByteBody buildFormRequest( em.onRequest(n -> { try { while (n-- > 0) { - HttpContent chunk = encoder.readChunk(PooledByteBufAllocator.DEFAULT); + HttpContent chunk = encoder.readChunk(ByteBufAllocator.DEFAULT); if (chunk == null) { assert encoder.isEndOfInput(); em.complete(); diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/Http1ResponseHandler.java b/http-client/src/main/java/io/micronaut/http/client/netty/Http1ResponseHandler.java index 7af3435a7d9..5091d0fd362 100644 --- a/http-client/src/main/java/io/micronaut/http/client/netty/Http1ResponseHandler.java +++ b/http-client/src/main/java/io/micronaut/http/client/netty/Http1ResponseHandler.java @@ -18,10 +18,10 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; import io.micronaut.http.body.CloseableByteBody; +import io.micronaut.http.body.stream.BodySizeLimits; +import io.micronaut.http.body.stream.BufferConsumer; import io.micronaut.http.client.exceptions.ResponseClosedException; import io.micronaut.http.netty.body.AvailableNettyByteBody; -import io.micronaut.http.netty.body.BodySizeLimits; -import io.micronaut.http.netty.body.BufferConsumer; import io.micronaut.http.netty.body.StreamingNettyByteBody; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/RawHttpRequestWrapper.java b/http-client/src/main/java/io/micronaut/http/client/netty/RawHttpRequestWrapper.java index a3669807523..1e021405788 100644 --- a/http-client/src/main/java/io/micronaut/http/client/netty/RawHttpRequestWrapper.java +++ b/http-client/src/main/java/io/micronaut/http/client/netty/RawHttpRequestWrapper.java @@ -20,6 +20,7 @@ import io.micronaut.core.annotation.Nullable; import io.micronaut.core.convert.ConversionService; import io.micronaut.http.MutableHttpRequest; +import io.micronaut.http.MutableHttpRequestWrapper; import io.micronaut.http.ServerHttpRequest; import io.micronaut.http.body.ByteBody; import io.micronaut.http.body.CloseableByteBody; diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/SseSplitter.java b/http-client/src/main/java/io/micronaut/http/client/netty/SseSplitter.java index 7aadca76ee0..024d6c5c9fb 100644 --- a/http-client/src/main/java/io/micronaut/http/client/netty/SseSplitter.java +++ b/http-client/src/main/java/io/micronaut/http/client/netty/SseSplitter.java @@ -17,8 +17,8 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; +import io.micronaut.http.body.stream.BodySizeLimits; import io.micronaut.http.client.exceptions.ContentLengthExceededException; -import io.micronaut.http.netty.body.BodySizeLimits; import io.netty.buffer.ByteBuf; import reactor.core.publisher.Flux; diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/StreamWriter.java b/http-client/src/main/java/io/micronaut/http/client/netty/StreamWriter.java index 1ea457aa490..1e0a8d6abd8 100644 --- a/http-client/src/main/java/io/micronaut/http/client/netty/StreamWriter.java +++ b/http-client/src/main/java/io/micronaut/http/client/netty/StreamWriter.java @@ -17,7 +17,7 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.http.netty.EventLoopFlow; -import io.micronaut.http.netty.body.BufferConsumer; +import io.micronaut.http.netty.body.ByteBufConsumer; import io.micronaut.http.netty.body.StreamingNettyByteBody; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFutureListener; @@ -36,7 +36,7 @@ * @since 4.7.0 */ @Internal -final class StreamWriter extends ChannelInboundHandlerAdapter implements BufferConsumer { +final class StreamWriter extends ChannelInboundHandlerAdapter implements ByteBufConsumer { private final Consumer errorHandler; private ChannelHandlerContext ctx; private EventLoopFlow flow; diff --git a/http-client/src/test/groovy/io/micronaut/http/client/netty/Http1ResponseHandlerSpec.groovy b/http-client/src/test/groovy/io/micronaut/http/client/netty/Http1ResponseHandlerSpec.groovy index 7a83a8964d5..438cd5669f7 100644 --- a/http-client/src/test/groovy/io/micronaut/http/client/netty/Http1ResponseHandlerSpec.groovy +++ b/http-client/src/test/groovy/io/micronaut/http/client/netty/Http1ResponseHandlerSpec.groovy @@ -3,7 +3,7 @@ package io.micronaut.http.client.netty import io.micronaut.http.body.AvailableByteBody import io.micronaut.http.body.CloseableByteBody import io.micronaut.http.body.InternalByteBody -import io.micronaut.http.netty.body.BufferConsumer +import io.micronaut.http.netty.body.ByteBufConsumer import io.micronaut.http.netty.body.StreamingNettyByteBody import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled @@ -211,7 +211,7 @@ class Http1ResponseHandlerSpec extends Specification { when: def completed = false def buffer = Unpooled.compositeBuffer() - def upstream = ((StreamingNettyByteBody) listener.body).primary(new BufferConsumer() { + def upstream = ((StreamingNettyByteBody) listener.body).primary(new ByteBufConsumer() { @Override void add(ByteBuf buf) { buffer.addComponent(true, buf) diff --git a/http-client/src/test/groovy/io/micronaut/http/client/netty/SseSplitterSpec.groovy b/http-client/src/test/groovy/io/micronaut/http/client/netty/SseSplitterSpec.groovy index 7545ed92931..7e269757e36 100644 --- a/http-client/src/test/groovy/io/micronaut/http/client/netty/SseSplitterSpec.groovy +++ b/http-client/src/test/groovy/io/micronaut/http/client/netty/SseSplitterSpec.groovy @@ -1,6 +1,6 @@ package io.micronaut.http.client.netty -import io.micronaut.http.netty.body.BodySizeLimits +import io.micronaut.http.body.stream.BodySizeLimits import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled import reactor.core.publisher.Flux diff --git a/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java b/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java index 27a9641e3d7..3e84d64c22c 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsStream.java @@ -17,6 +17,7 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; +import io.micronaut.http.body.stream.PublisherAsBlocking; import io.netty.buffer.ByteBuf; import java.io.IOException; @@ -46,7 +47,7 @@ public int read() throws IOException { } @Override - public int read(@NonNull byte[] b, int off, int len) throws IOException { + public int read(byte @NonNull [] b, int off, int len) throws IOException { while (buffer == null) { try { ByteBuf o = publisherAsBlocking.take(); diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/AvailableNettyByteBody.java b/http-netty/src/main/java/io/micronaut/http/netty/body/AvailableNettyByteBody.java index 962166591ab..b5361ef5efd 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/AvailableNettyByteBody.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/AvailableNettyByteBody.java @@ -24,6 +24,9 @@ import io.micronaut.http.body.AvailableByteBody; import io.micronaut.http.body.CloseableAvailableByteBody; import io.micronaut.http.body.CloseableByteBody; +import io.micronaut.http.body.stream.BaseSharedBuffer; +import io.micronaut.http.body.stream.BodySizeLimits; +import io.micronaut.http.body.stream.BufferConsumer; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufUtil; @@ -113,12 +116,10 @@ public long length() { private ByteBuf claim() { ByteBuf b = buffer; if (b == null) { - failClaim(); + BaseSharedBuffer.failClaim(); } this.buffer = null; - if (LOG.isTraceEnabled()) { - LOG.trace("Body claimed at this location. This is not an error, but may aid in debugging other errors", new Exception()); - } + BaseSharedBuffer.logClaim(); return b; } @@ -175,10 +176,7 @@ protected Flux toByteBufPublisher() { public @NonNull CloseableAvailableByteBody split() { ByteBuf b = buffer; if (b == null) { - failClaim(); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Body split at this location. This is not an error, but may aid in debugging other errors", new Exception()); + BaseSharedBuffer.failClaim(); } return new AvailableNettyByteBody(b.retainedSlice()); } diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/ByteBufConsumer.java b/http-netty/src/main/java/io/micronaut/http/netty/body/ByteBufConsumer.java new file mode 100644 index 00000000000..1bd11c35ef1 --- /dev/null +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/ByteBufConsumer.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.netty.body; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.http.body.stream.BufferConsumer; +import io.netty.buffer.ByteBuf; + +/** + * This is a netty-specific reactor-like API for streaming bytes. It's a bit better than reactor + * because it's more explicit about reference counting semantics, has more fine-grained controls + * for cancelling, and has more relaxed concurrency semantics. + * + * @since 4.5.0 + * @author Jonas Konrad + */ +@Internal +public interface ByteBufConsumer extends BufferConsumer { + /** + * Consume a buffer. Release ownership is transferred to this consumer. + * + * @param buf The buffer to consume + */ + void add(@NonNull ByteBuf buf); +} diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyBodyAdapter.java b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyBodyAdapter.java index 361c42ed8df..f8c05103fff 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyBodyAdapter.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyBodyAdapter.java @@ -20,6 +20,8 @@ import io.micronaut.core.annotation.Nullable; import io.micronaut.http.body.AvailableByteBody; import io.micronaut.http.body.ByteBody; +import io.micronaut.http.body.stream.BodySizeLimits; +import io.micronaut.http.body.stream.BufferConsumer; import io.micronaut.http.netty.EventLoopFlow; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyByteBody.java b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyByteBody.java index 919006eff7c..ede22fc01ad 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/NettyByteBody.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/NettyByteBody.java @@ -40,7 +40,7 @@ @Internal public abstract sealed class NettyByteBody implements ByteBody, InternalByteBody permits AvailableNettyByteBody, StreamingNettyByteBody { // don't change this, isolate body buffering to separate logging name space - protected static final Logger LOG = LoggerFactory.getLogger(NettyByteBufferFactory.class); + protected static final Logger LOG = LoggerFactory.getLogger(ByteBody.class); public static Flux toByteBufs(ByteBody body) { if (body instanceof NettyByteBody net) { @@ -69,8 +69,4 @@ public static Flux toByteBufs(ByteBody body) { public abstract @NonNull ExecutionFlow bufferFlow(); abstract Flux toByteBufPublisher(); - - static void failClaim() { - throw new IllegalStateException("Request body has already been claimed: Two conflicting sites are trying to access the request body. If this is intentional, the first user must ByteBody#split the body. To find out where the body was claimed, turn on TRACE logging for io.micronaut.http.netty.body.NettyByteBody."); - } } diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/StreamingNettyByteBody.java b/http-netty/src/main/java/io/micronaut/http/netty/body/StreamingNettyByteBody.java index b403396c4c8..0d3866b56a0 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/StreamingNettyByteBody.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/StreamingNettyByteBody.java @@ -23,9 +23,11 @@ import io.micronaut.core.util.SupplierUtil; import io.micronaut.http.body.CloseableAvailableByteBody; import io.micronaut.http.body.CloseableByteBody; -import io.micronaut.http.exceptions.BufferLengthExceededException; -import io.micronaut.http.exceptions.ContentLengthExceededException; -import io.micronaut.http.netty.PublisherAsBlocking; +import io.micronaut.http.body.stream.BaseSharedBuffer; +import io.micronaut.http.body.stream.BodySizeLimits; +import io.micronaut.http.body.stream.BufferConsumer; +import io.micronaut.http.body.stream.PublisherAsBlocking; +import io.micronaut.http.body.stream.UpstreamBalancer; import io.micronaut.http.netty.PublisherAsStream; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; @@ -38,13 +40,10 @@ import io.netty.util.ResourceLeakDetectorFactory; import io.netty.util.ResourceLeakTracker; import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; import java.io.InputStream; -import java.util.ArrayList; import java.util.List; import java.util.OptionalLong; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; /** @@ -74,7 +73,7 @@ public final class StreamingNettyByteBody extends NettyByteBody implements Close private BufferConsumer.Upstream upstream; public StreamingNettyByteBody(SharedBuffer sharedBuffer) { - this(sharedBuffer, false, sharedBuffer.rootUpstream); + this(sharedBuffer, false, sharedBuffer.getRootUpstream()); } private StreamingNettyByteBody(SharedBuffer sharedBuffer, boolean forceDelaySubscribe, BufferConsumer.Upstream upstream) { @@ -83,12 +82,13 @@ private StreamingNettyByteBody(SharedBuffer sharedBuffer, boolean forceDelaySubs this.upstream = upstream; } - public BufferConsumer.Upstream primary(BufferConsumer primary) { + public BufferConsumer.Upstream primary(ByteBufConsumer primary) { BufferConsumer.Upstream upstream = this.upstream; if (upstream == null) { - failClaim(); + BaseSharedBuffer.failClaim(); } this.upstream = null; + BaseSharedBuffer.logClaim(); sharedBuffer.subscribe(primary, upstream, forceDelaySubscribe); return upstream; } @@ -97,7 +97,7 @@ public BufferConsumer.Upstream primary(BufferConsumer primary) { public @NonNull CloseableByteBody split(@NonNull SplitBackpressureMode backpressureMode) { BufferConsumer.Upstream upstream = this.upstream; if (upstream == null) { - failClaim(); + BaseSharedBuffer.failClaim(); } UpstreamBalancer.UpstreamPair pair = UpstreamBalancer.balancer(upstream, backpressureMode); this.upstream = pair.left(); @@ -109,7 +109,7 @@ public BufferConsumer.Upstream primary(BufferConsumer primary) { public @NonNull StreamingNettyByteBody allowDiscard() { BufferConsumer.Upstream upstream = this.upstream; if (upstream == null) { - failClaim(); + BaseSharedBuffer.failClaim(); } upstream.allowDiscard(); return this; @@ -117,54 +117,25 @@ public BufferConsumer.Upstream primary(BufferConsumer primary) { @Override protected Flux toByteBufPublisher() { - AtomicLong unconsumed = new AtomicLong(0); - Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); - BufferConsumer.Upstream upstream = primary(new BufferConsumer() { - @Override - public void add(ByteBuf buf) { - long newLength = unconsumed.addAndGet(buf.readableBytes()); - if (newLength > sharedBuffer.limits.maxBufferSize()) { - sink.tryEmitError(new BufferLengthExceededException(sharedBuffer.limits.maxBufferSize(), newLength)); - buf.release(); - } else { - if (sink.tryEmitNext(buf) != Sinks.EmitResult.OK) { - buf.release(); - } - } - } - - @Override - public void complete() { - sink.tryEmitComplete(); - } - - @Override - public void error(Throwable e) { - sink.tryEmitError(e); - } - }); - return sink.asFlux() - .doOnSubscribe(s -> upstream.start()) - .doOnNext(bb -> { - unconsumed.addAndGet(-bb.readableBytes()); - upstream.onBytesConsumed(bb.readableBytes()); - }) - .doOnDiscard(ByteBuf.class, ReferenceCounted::release) - .doOnCancel(() -> { - upstream.allowDiscard(); - upstream.disregardBackpressure(); - }); + AsFlux asFlux = new AsFlux(sharedBuffer); + BufferConsumer.Upstream upstream = primary(asFlux); + return asFlux.asFlux(upstream) + .doOnDiscard(ByteBuf.class, ReferenceCounted::release); } @Override public @NonNull OptionalLong expectedLength() { - long l = sharedBuffer.expectedLength; - return l < 0 ? OptionalLong.empty() : OptionalLong.of(l); + return sharedBuffer.getExpectedLength(); } @Override public @NonNull InputStream toInputStream() { - PublisherAsBlocking blocking = new PublisherAsBlocking<>(); + PublisherAsBlocking blocking = new PublisherAsBlocking<>() { + @Override + protected void release(ByteBuf item) { + item.release(); + } + }; toByteBufPublisher().subscribe(blocking); return new PublisherAsStream(blocking); } @@ -173,9 +144,10 @@ public void error(Throwable e) { public @NonNull ExecutionFlow bufferFlow() { BufferConsumer.Upstream upstream = this.upstream; if (upstream == null) { - failClaim(); + BaseSharedBuffer.failClaim(); } this.upstream = null; + BaseSharedBuffer.logClaim(); upstream.start(); upstream.onBytesConsumed(Long.MAX_VALUE); return sharedBuffer.subscribeFull(upstream, forceDelaySubscribe).map(AvailableNettyByteBody::new); @@ -198,20 +170,39 @@ public void close() { return; } this.upstream = null; + BaseSharedBuffer.logClaim(); upstream.allowDiscard(); upstream.disregardBackpressure(); upstream.start(); sharedBuffer.subscribe(null, upstream, forceDelaySubscribe); } + private static final class AsFlux extends BaseSharedBuffer.AsFlux implements ByteBufConsumer { + public AsFlux(BaseSharedBuffer sharedBuffer) { + super(sharedBuffer); + } + + @Override + public void add(ByteBuf buf) { + if (!add0(buf)) { + buf.release(); + } + } + + @Override + protected int size(ByteBuf buf) { + return buf.readableBytes(); + } + } + /** * This class buffers input data and distributes it to multiple {@link StreamingNettyByteBody} * instances. - *

Thread safety: The {@link BufferConsumer} methods must only be called from one + *

Thread safety: The {@link ByteBufConsumer} methods must only be called from one * thread, the {@link #eventLoop} thread. The other methods (subscribe, reserve) can be * called from any thread. */ - public static final class SharedBuffer implements BufferConsumer { + public static final class SharedBuffer extends BaseSharedBuffer implements ByteBufConsumer { private static final Supplier> LEAK_DETECTOR = SupplierUtil.memoized(() -> ResourceLeakDetectorFactory.instance().newResourceLeakDetector(SharedBuffer.class)); @@ -219,93 +210,27 @@ public static final class SharedBuffer implements BufferConsumer { private final ResourceLeakTracker tracker = LEAK_DETECTOR.get().track(this); private final EventLoop eventLoop; - private final BodySizeLimits limits; - /** - * Upstream of all subscribers. This is only used to cancel incoming data if the max - * request size is exceeded. - */ - private final Upstream rootUpstream; /** * Buffered data. This is forwarded to new subscribers. */ private CompositeByteBuf buffer; - /** - * Whether the input is complete. - */ - private boolean complete; - /** - * Any stream error. - */ - private Throwable error; - /** - * Number of reserved subscriber spots. A new subscription MUST be preceded by a - * reservation, and every reservation MUST have a subscription. - */ - private int reserved = 1; - /** - * Active subscribers. - */ - private List<@NonNull BufferConsumer> subscribers; /** * Active subscribers that need the fully buffered body. */ private List<@NonNull DelayedExecutionFlow> fullSubscribers; - /** - * This flag is only used in tests, to verify that the BufferConsumer methods arent called - * in a reentrant fashion. - */ - private boolean working = false; - /** - * {@code true} during {@link #add(ByteBuf)} to avoid reentrant subscribe or reserve calls. - * Field must only be accessed on the event loop. - */ - private boolean adding = false; - /** - * Number of bytes received so far. - */ - private long lengthSoFar = 0; - /** - * The expected length of the whole body. This is -1 if we're uncertain, otherwise it must - * be accurate. This can come from a content-length header, but it's also set once the full - * body has been received. - */ - private volatile long expectedLength = -1; + private ByteBuf addingBuffer; public SharedBuffer(EventLoop loop, BodySizeLimits limits, Upstream rootUpstream) { + super(limits, rootUpstream); this.eventLoop = loop; - this.limits = limits; - this.rootUpstream = rootUpstream; } public void setExpectedLengthFrom(HttpHeaders headers) { - String s = headers.get(HttpHeaderNames.CONTENT_LENGTH); - if (s == null) { - return; - } - long parsed; - try { - parsed = Long.parseLong(s); - } catch (NumberFormatException e) { - return; - } - if (parsed < 0) { - return; - } - if (parsed > limits.maxBodySize()) { - error(new ContentLengthExceededException(limits.maxBodySize(), parsed)); - } - setExpectedLength(parsed); - } - - public void setExpectedLength(long length) { - if (length < 0) { - throw new IllegalArgumentException("Should be > 0"); - } - this.expectedLength = length; + setExpectedLengthFrom(headers.get(HttpHeaderNames.CONTENT_LENGTH)); } boolean reserve() { - if (eventLoop.inEventLoop() && !adding) { + if (eventLoop.inEventLoop() && addingBuffer == null) { reserve0(); return false; } else { @@ -314,11 +239,9 @@ boolean reserve() { } } - private void reserve0() { - if (reserved == 0) { - throw new IllegalStateException("Cannot go from streaming state back to buffering state"); - } - reserved++; + @Override + protected void reserve0() { + super.reserve0(); if (tracker != null) { tracker.record(); } @@ -331,28 +254,17 @@ private void reserve0() { * @param specificUpstream The upstream for the subscriber. This is used to call allowDiscard if there was an error * @param forceDelay Whether to require an {@link EventLoop#execute} call to ensure serialization with previous {@link #reserve()} call */ - void subscribe(@Nullable BufferConsumer subscriber, Upstream specificUpstream, boolean forceDelay) { - if (!forceDelay && eventLoop.inEventLoop() && !adding) { + void subscribe(@Nullable ByteBufConsumer subscriber, Upstream specificUpstream, boolean forceDelay) { + if (!forceDelay && eventLoop.inEventLoop() && addingBuffer == null) { subscribe0(subscriber, specificUpstream); } else { eventLoop.execute(() -> subscribe0(subscriber, specificUpstream)); } } - private void subscribe0(@Nullable BufferConsumer subscriber, Upstream specificUpstream) { - assert !working; - - if (reserved == 0) { - throw new IllegalStateException("Need to reserve a spot first"); - } - - working = true; - boolean last = --reserved == 0; + @Override + protected void forwardInitialBuffer(@Nullable ByteBufConsumer subscriber, boolean last) { if (subscriber != null) { - if (subscribers == null) { - subscribers = new ArrayList<>(1); - } - subscribers.add(subscriber); if (buffer != null) { if (last) { subscriber.add(buffer.slice()); @@ -361,21 +273,16 @@ private void subscribe0(@Nullable BufferConsumer subscriber, Upstream specificUp subscriber.add(buffer.retainedSlice()); } } - if (error != null) { - subscriber.error(error); - } else if (lengthSoFar > limits.maxBufferSize()) { - subscriber.error(new BufferLengthExceededException(limits.maxBufferSize(), lengthSoFar)); - specificUpstream.allowDiscard(); - } - if (complete) { - subscriber.complete(); - } } else { if (buffer != null && last) { buffer.release(); buffer = null; } } + } + + @Override + protected void afterSubscribe(boolean last) { if (tracker != null) { if (last) { tracker.close(this); @@ -383,7 +290,19 @@ private void subscribe0(@Nullable BufferConsumer subscriber, Upstream specificUp tracker.record(); } } - working = false; + } + + @Override + protected ByteBuf subscribeFullResult(boolean last) { + if (buffer == null) { + return Unpooled.EMPTY_BUFFER; + } else if (last) { + ByteBuf buf = buffer; + buffer = null; + return buf; + } else { + return buffer.retainedSlice(); + } } /** @@ -396,7 +315,7 @@ private void subscribe0(@Nullable BufferConsumer subscriber, Upstream specificUp */ ExecutionFlow subscribeFull(Upstream specificUpstream, boolean forceDelay) { DelayedExecutionFlow asyncFlow = DelayedExecutionFlow.create(); - if (!forceDelay && eventLoop.inEventLoop() && !adding) { + if (!forceDelay && eventLoop.inEventLoop() && addingBuffer == null) { return subscribeFull0(asyncFlow, specificUpstream, true); } else { eventLoop.execute(() -> { @@ -407,188 +326,39 @@ ExecutionFlow subscribeFull(Upstream specificUpstream, boolean forceDel } } - /** - * On-loop version of {@link #subscribeFull}. The returned flow will complete when the - * input is buffered. The returned flow will always be identical to the {@code targetFlow} - * parameter IF {@code canReturnImmediate} is false. If {@code canReturnImmediate} is true, - * this method will SOMETIMES return an immediate ExecutionFlow instead as an optimization. - * - * @param targetFlow The delayed flow to use if {@code canReturnImmediate} is false and/or - * we have to wait for the result - * @param canReturnImmediate Whether we can return an immediate ExecutionFlow instead of - * {@code targetFlow}, when appropriate - */ - private ExecutionFlow subscribeFull0(DelayedExecutionFlow targetFlow, Upstream specificUpstream, boolean canReturnImmediate) { - assert !working; - - if (reserved <= 0) { - throw new IllegalStateException("Need to reserve a spot first. This should not happen, StreamingNettyByteBody should guard against it"); - } - - ExecutionFlow ret = targetFlow; - - working = true; - boolean last = --reserved == 0; - Throwable error = this.error; - if (error == null && lengthSoFar > limits.maxBufferSize()) { - error = new BufferLengthExceededException(limits.maxBufferSize(), lengthSoFar); - specificUpstream.allowDiscard(); - } - if (error != null) { - if (canReturnImmediate) { - ret = ExecutionFlow.error(error); - } else { - targetFlow.completeExceptionally(error); - } - } else if (complete) { - ByteBuf buf; - if (buffer == null) { - buf = Unpooled.EMPTY_BUFFER; - } else if (last) { - buf = buffer; - buffer = null; - } else { - buf = buffer.retainedSlice(); - } - if (canReturnImmediate) { - ret = ExecutionFlow.just(buf); - } else { - targetFlow.complete(buf); - } - } else { - if (fullSubscribers == null) { - fullSubscribers = new ArrayList<>(1); - } - fullSubscribers.add(targetFlow); - } - if (tracker != null) { - if (last) { - tracker.close(this); - } else { - tracker.record(); - } - } - working = false; - - return ret; - } - @Override public void add(ByteBuf buf) { - assert !working; - - buf.touch(); - - // calculate the new total length - long newLength = lengthSoFar + buf.readableBytes(); - long expectedLength = this.expectedLength; - if (expectedLength != -1 && newLength > expectedLength) { - throw new IllegalStateException("Received more bytes than specified by Content-Length"); - } - lengthSoFar = newLength; + addingBuffer = buf.touch(); + add(buf.readableBytes()); + addingBuffer = null; + } - // drop messages if we're done with all subscribers - if (complete || error != null) { - buf.release(); - return; - } - adding = true; - if (newLength > limits.maxBodySize()) { - // for maxBodySize, all subscribers get the error - buf.release(); - error(new ContentLengthExceededException(limits.maxBodySize(), newLength)); - rootUpstream.allowDiscard(); - adding = false; - return; + @Override + protected void addForward(List consumers) { + for (ByteBufConsumer consumer : consumers) { + consumer.add(addingBuffer.retainedSlice()); } + } - working = true; - if (subscribers != null) { - for (BufferConsumer subscriber : subscribers) { - subscriber.add(buf.retainedSlice()); - } - } - if (reserved > 0 || fullSubscribers != null) { - if (newLength > limits.maxBufferSize()) { - // new subscribers will recognize that the limit has been exceeded. Streaming - // subscribers can proceed normally. Need to notify buffering subscribers - buf.release(); - if (buffer != null) { - buffer.release(); - buffer = null; - } - if (fullSubscribers != null) { - Exception e = new BufferLengthExceededException(limits.maxBufferSize(), lengthSoFar); - for (DelayedExecutionFlow fullSubscriber : fullSubscribers) { - fullSubscriber.completeExceptionally(e); - } - } - } else { - if (buffer == null) { - buffer = buf.alloc().compositeBuffer(); - } - buffer.addComponent(true, buf); - } - } else { - buf.release(); + @Override + protected void addBuffer() { + if (buffer == null) { + buffer = addingBuffer.alloc().compositeBuffer(); } - adding = false; - working = false; + buffer.addComponent(true, addingBuffer); } @Override - public void complete() { - if (expectedLength > lengthSoFar) { - throw new IllegalStateException("Received fewer bytes than specified by Content-Length"); - } - complete = true; - expectedLength = lengthSoFar; - if (subscribers != null) { - for (BufferConsumer subscriber : subscribers) { - subscriber.complete(); - } - } - if (fullSubscribers != null) { - boolean release; - ByteBuf buf; - if (buffer == null) { - buf = Unpooled.EMPTY_BUFFER; - release = false; - } else { - buf = buffer; - if (reserved > 0) { - release = false; - } else { - this.buffer = null; - release = true; - } - } - for (DelayedExecutionFlow fullSubscriber : fullSubscribers) { - fullSubscriber.complete(buf.retainedSlice()); - } - if (release) { - buf.release(); - } - } + protected void addDoNotBuffer() { + addingBuffer.release(); } @Override - public void error(Throwable e) { - error = e; + protected void discardBuffer() { if (buffer != null) { buffer.release(); buffer = null; } - if (subscribers != null) { - for (BufferConsumer subscriber : subscribers) { - subscriber.error(e); - } - } - if (fullSubscribers != null) { - for (DelayedExecutionFlow fullSubscriber : fullSubscribers) { - fullSubscriber.completeExceptionally(e); - } - } } } } diff --git a/http-netty/src/test/groovy/io/micronaut/http/netty/body/NettyBodyAdapterSpec.groovy b/http-netty/src/test/groovy/io/micronaut/http/netty/body/NettyBodyAdapterSpec.groovy index 672dd011dc4..ee6497e4881 100644 --- a/http-netty/src/test/groovy/io/micronaut/http/netty/body/NettyBodyAdapterSpec.groovy +++ b/http-netty/src/test/groovy/io/micronaut/http/netty/body/NettyBodyAdapterSpec.groovy @@ -12,7 +12,7 @@ class NettyBodyAdapterSpec extends Specification { def flux = Flux.just(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(new byte[] {1, 2, 3})) def adapter = NettyBodyAdapter.adapt(flux, new EmbeddedChannel().eventLoop()) def received = Unpooled.buffer() - def upstream = adapter.primary(new BufferConsumer() { + def upstream = adapter.primary(new ByteBufConsumer() { @Override void add(ByteBuf buf) { received.writeBytes(buf) diff --git a/http-netty/src/test/groovy/io/micronaut/http/netty/body/UpstreamBalancerSpec.groovy b/http-netty/src/test/groovy/io/micronaut/http/netty/body/UpstreamBalancerSpec.groovy index aa896fea6f8..dc54d098d13 100644 --- a/http-netty/src/test/groovy/io/micronaut/http/netty/body/UpstreamBalancerSpec.groovy +++ b/http-netty/src/test/groovy/io/micronaut/http/netty/body/UpstreamBalancerSpec.groovy @@ -1,6 +1,8 @@ package io.micronaut.http.netty.body import io.micronaut.http.body.ByteBody.SplitBackpressureMode +import io.micronaut.http.body.stream.BufferConsumer +import io.micronaut.http.body.stream.UpstreamBalancer import spock.lang.Specification class UpstreamBalancerSpec extends Specification { diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java index ad5544ca4da..3719249aa65 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/HttpPipelineBuilder.java @@ -20,8 +20,8 @@ import io.micronaut.core.naming.Named; import io.micronaut.core.util.SupplierUtil; import io.micronaut.http.HttpVersion; +import io.micronaut.http.body.stream.BodySizeLimits; import io.micronaut.http.netty.channel.ChannelPipelineCustomizer; -import io.micronaut.http.netty.body.BodySizeLimits; import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration; import io.micronaut.http.server.netty.handler.Http2ServerHandler; import io.micronaut.http.server.netty.handler.PipeliningServerHandler; diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Http2ServerHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Http2ServerHandler.java index 0407456e084..0e88d3c5c4b 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Http2ServerHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/Http2ServerHandler.java @@ -18,8 +18,8 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.Nullable; import io.micronaut.http.body.ByteBody; +import io.micronaut.http.body.stream.BodySizeLimits; import io.micronaut.http.server.netty.HttpCompressionStrategy; -import io.micronaut.http.netty.body.BodySizeLimits; import io.micronaut.http.server.netty.handler.accesslog.Http2AccessLogConnectionEncoder; import io.micronaut.http.server.netty.handler.accesslog.Http2AccessLogFrameListener; import io.micronaut.http.server.netty.handler.accesslog.Http2AccessLogManager; diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/MultiplexedServerHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/MultiplexedServerHandler.java index f361e3e825a..24162105037 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/MultiplexedServerHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/MultiplexedServerHandler.java @@ -19,10 +19,11 @@ import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; import io.micronaut.http.body.ByteBody; +import io.micronaut.http.body.stream.BodySizeLimits; +import io.micronaut.http.body.stream.BufferConsumer; import io.micronaut.http.netty.EventLoopFlow; import io.micronaut.http.netty.body.AvailableNettyByteBody; -import io.micronaut.http.netty.body.BodySizeLimits; -import io.micronaut.http.netty.body.BufferConsumer; +import io.micronaut.http.netty.body.ByteBufConsumer; import io.micronaut.http.netty.body.NettyBodyAdapter; import io.micronaut.http.netty.body.NettyByteBody; import io.micronaut.http.netty.body.StreamingNettyByteBody; @@ -259,7 +260,7 @@ public void write(@NonNull HttpResponse response, @NonNull ByteBody body) { writeFull(response, AvailableNettyByteBody.toByteBuf(available)); } else { StreamingNettyByteBody snbb = (StreamingNettyByteBody) nbb; - var consumer = new BufferConsumer() { + var consumer = new ByteBufConsumer() { Upstream upstream; final EventLoopFlow flow = new EventLoopFlow(ctx.channel().eventLoop()); @@ -462,7 +463,7 @@ private void writeDataCompressing(ByteBuf data, boolean endStream, ChannelPromis * This is the {@link HotObservable} that represents the request body in the streaming * request case. */ - private class InputStreamer implements BufferConsumer.Upstream, BufferConsumer { + private class InputStreamer implements BufferConsumer.Upstream, ByteBufConsumer { final StreamingNettyByteBody.SharedBuffer dest = new StreamingNettyByteBody.SharedBuffer(ctx.channel().eventLoop(), bodySizeLimits, this); /** * Number of bytes that have been received by {@link #add(ByteBuf)} but the downstream diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java index ca661e9dd36..c15b9a54f61 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/handler/PipeliningServerHandler.java @@ -19,10 +19,11 @@ import io.micronaut.core.annotation.NonNull; import io.micronaut.core.annotation.Nullable; import io.micronaut.http.body.ByteBody; +import io.micronaut.http.body.stream.BodySizeLimits; +import io.micronaut.http.body.stream.BufferConsumer; import io.micronaut.http.netty.EventLoopFlow; import io.micronaut.http.netty.body.AvailableNettyByteBody; -import io.micronaut.http.netty.body.BodySizeLimits; -import io.micronaut.http.netty.body.BufferConsumer; +import io.micronaut.http.netty.body.ByteBufConsumer; import io.micronaut.http.netty.body.NettyBodyAdapter; import io.micronaut.http.netty.body.NettyByteBody; import io.micronaut.http.netty.body.StreamingNettyByteBody; @@ -1033,7 +1034,7 @@ void discardOutbound() { /** * Handler that writes a {@link StreamedHttpResponse}. */ - private final class StreamingOutboundHandler extends OutboundHandler implements BufferConsumer { + private final class StreamingOutboundHandler extends OutboundHandler implements ByteBufConsumer { private final EventLoopFlow flow = new EventLoopFlow(ctx.channel().eventLoop()); private final OutboundAccessImpl outboundAccess; private HttpResponse initialMessage; diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java index 95a7cff2378..e566accc0b5 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/multipart/NettyStreamingFileUpload.java @@ -20,10 +20,10 @@ import io.micronaut.core.naming.NameUtils; import io.micronaut.core.util.functional.ThrowingSupplier; import io.micronaut.http.MediaType; +import io.micronaut.http.body.stream.PublisherAsBlocking; import io.micronaut.http.multipart.MultipartException; import io.micronaut.http.multipart.PartData; import io.micronaut.http.multipart.StreamingFileUpload; -import io.micronaut.http.netty.PublisherAsBlocking; import io.micronaut.http.netty.PublisherAsStream; import io.micronaut.http.server.HttpServerConfiguration; import io.netty.buffer.ByteBuf; @@ -129,7 +129,12 @@ public Publisher delete() { @Override public InputStream asInputStream() { - PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking<>(); + PublisherAsBlocking publisherAsBlocking = new PublisherAsBlocking<>() { + @Override + protected void release(ByteBuf item) { + item.release(); + } + }; subject.map(pd -> ((NettyPartData) pd).getByteBuf()).subscribe(publisherAsBlocking); return new PublisherAsStream(publisherAsBlocking); } diff --git a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/fuzzing/BufferLeakDetection.groovy b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/fuzzing/BufferLeakDetection.groovy index 74243117c16..6c42b1c6480 100644 --- a/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/fuzzing/BufferLeakDetection.groovy +++ b/http-server-netty/src/test/groovy/io/micronaut/http/server/netty/fuzzing/BufferLeakDetection.groovy @@ -1,7 +1,7 @@ package io.micronaut.http.server.netty.fuzzing import io.netty.buffer.ByteBuf -import io.netty.buffer.PooledByteBufAllocator +import io.netty.buffer.ByteBufAllocator import io.netty.util.ResourceLeakDetector import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -49,7 +49,7 @@ class BufferLeakDetection extends ResourceLeakDetector { } private static void leakCanary() { - ByteBuf resource = PooledByteBufAllocator.DEFAULT.directBuffer() + ByteBuf resource = ByteBufAllocator.DEFAULT.directBuffer() resource.touch(canaryString) } diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/MutableHttpRequestWrapper.java b/http/src/main/java/io/micronaut/http/MutableHttpRequestWrapper.java similarity index 85% rename from http-client/src/main/java/io/micronaut/http/client/netty/MutableHttpRequestWrapper.java rename to http/src/main/java/io/micronaut/http/MutableHttpRequestWrapper.java index d176ddf1113..6e42c7609fc 100644 --- a/http-client/src/main/java/io/micronaut/http/client/netty/MutableHttpRequestWrapper.java +++ b/http/src/main/java/io/micronaut/http/MutableHttpRequestWrapper.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.http.client.netty; +package io.micronaut.http; import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; @@ -21,11 +21,6 @@ import io.micronaut.core.convert.ArgumentConversionContext; import io.micronaut.core.convert.ConversionContext; import io.micronaut.core.convert.ConversionService; -import io.micronaut.http.HttpRequest; -import io.micronaut.http.HttpRequestWrapper; -import io.micronaut.http.MutableHttpHeaders; -import io.micronaut.http.MutableHttpParameters; -import io.micronaut.http.MutableHttpRequest; import io.micronaut.http.cookie.Cookie; import java.net.URI; @@ -38,7 +33,7 @@ * @since 4.0.0 */ @Internal -class MutableHttpRequestWrapper extends HttpRequestWrapper implements MutableHttpRequest { +public class MutableHttpRequestWrapper extends HttpRequestWrapper implements MutableHttpRequest { private ConversionService conversionService; @Nullable @@ -46,12 +41,12 @@ class MutableHttpRequestWrapper extends HttpRequestWrapper implements Muta @Nullable private URI uri; - MutableHttpRequestWrapper(ConversionService conversionService, HttpRequest delegate) { + protected MutableHttpRequestWrapper(ConversionService conversionService, HttpRequest delegate) { super(delegate); this.conversionService = conversionService; } - static MutableHttpRequest wrapIfNecessary(ConversionService conversionService, HttpRequest request) { + public static MutableHttpRequest wrapIfNecessary(ConversionService conversionService, HttpRequest request) { if (request instanceof MutableHttpRequest httpRequest) { return httpRequest; } else { diff --git a/http/src/main/java/io/micronaut/http/body/stream/AvailableByteArrayBody.java b/http/src/main/java/io/micronaut/http/body/stream/AvailableByteArrayBody.java index 258ccc9e366..9436b3291c7 100644 --- a/http/src/main/java/io/micronaut/http/body/stream/AvailableByteArrayBody.java +++ b/http/src/main/java/io/micronaut/http/body/stream/AvailableByteArrayBody.java @@ -56,7 +56,7 @@ public static AvailableByteArrayBody create(@NonNull ByteBufferFactory buf @Override public @NonNull CloseableAvailableByteBody split() { if (array == null) { - InputStreamByteBody.failClaim(); + BaseSharedBuffer.failClaim(); } return new AvailableByteArrayBody(bufferFactory, array); } @@ -69,7 +69,7 @@ public static AvailableByteArrayBody create(@NonNull ByteBufferFactory buf @Override public long length() { if (array == null) { - InputStreamByteBody.failClaim(); + BaseSharedBuffer.failClaim(); } return array.length; } @@ -78,9 +78,10 @@ public long length() { public byte @NonNull [] toByteArray() { byte[] a = array; if (a == null) { - InputStreamByteBody.failClaim(); + BaseSharedBuffer.failClaim(); } array = null; + BaseSharedBuffer.logClaim(); return a; } diff --git a/http/src/main/java/io/micronaut/http/body/stream/BaseSharedBuffer.java b/http/src/main/java/io/micronaut/http/body/stream/BaseSharedBuffer.java new file mode 100644 index 00000000000..4667c089319 --- /dev/null +++ b/http/src/main/java/io/micronaut/http/body/stream/BaseSharedBuffer.java @@ -0,0 +1,497 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.body.stream; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.annotation.Nullable; +import io.micronaut.core.execution.DelayedExecutionFlow; +import io.micronaut.core.execution.ExecutionFlow; +import io.micronaut.http.body.ByteBody; +import io.micronaut.http.exceptions.BufferLengthExceededException; +import io.micronaut.http.exceptions.ContentLengthExceededException; +import org.jetbrains.annotations.Contract; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.OptionalLong; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Base type for a shared buffer that distributes a single {@link BufferConsumer} input to multiple + * streaming {@link io.micronaut.http.body.ByteBody}s.
+ * The subclass handles concurrency (for netty, event loop) and the specific buffer type + * (for netty, ByteBuf). + * + * @param The {@link BufferConsumer} type for the specific buffer type + * @param The type returned to {@link #subscribeFull0} subscribers. This is usually just the + * buffer type + */ +@Internal +public abstract class BaseSharedBuffer { + private static final Class SPLIT_LOG_CLASS = ByteBody.class; + private static final Logger SPLIT_LOG = LoggerFactory.getLogger(SPLIT_LOG_CLASS); + + private final BodySizeLimits limits; + /** + * Upstream of all subscribers. This is only used to cancel incoming data if the max + * request size is exceeded. + */ + private final BufferConsumer.Upstream rootUpstream; + /** + * Whether the input is complete. + */ + private boolean complete; + /** + * Any stream error. + */ + private Throwable error; + /** + * Number of reserved subscriber spots. A new subscription MUST be preceded by a + * reservation, and every reservation MUST have a subscription. + */ + private int reserved = 1; + /** + * Active subscribers. + */ + private List<@NonNull C> subscribers; + /** + * Active subscribers that need the fully buffered body. + */ + private List<@NonNull DelayedExecutionFlow> fullSubscribers; + /** + * This flag is only used in tests, to verify that the BufferConsumer methods arent called + * in a reentrant fashion. + */ + private boolean working = false; + /** + * Number of bytes received so far. + */ + private long lengthSoFar = 0; + /** + * The expected length of the whole body. This is -1 if we're uncertain, otherwise it must + * be accurate. This can come from a content-length header, but it's also set once the full + * body has been received. + */ + private volatile long expectedLength = -1; + + public BaseSharedBuffer(BodySizeLimits limits, BufferConsumer.Upstream rootUpstream) { + this.limits = limits; + this.rootUpstream = rootUpstream; + } + + @Contract("-> fail") + public static void failClaim() { + throw new IllegalStateException("Request body has already been claimed: Two conflicting sites are trying to access the request body. If this is intentional, the first user must ByteBody#split the body. To find out where the body was claimed, turn on TRACE logging for " + SPLIT_LOG_CLASS.getName() + "."); + } + + public static void logClaim() { + if (SPLIT_LOG.isTraceEnabled()) { + SPLIT_LOG.trace("Body split at this location. This is not an error, but may aid in debugging other errors", new Exception()); + } + } + + /** + * Get the exact body length, if available. This is either set from {@code Content-Length} or + * when the body is fully buffered. + * + * @return The expected body length + */ + public final OptionalLong getExpectedLength() { + long l = expectedLength; + return l < 0 ? OptionalLong.empty() : OptionalLong.of(l); + } + + public final BodySizeLimits getLimits() { + return limits; + } + + public final BufferConsumer.Upstream getRootUpstream() { + return rootUpstream; + } + + public final void setExpectedLengthFrom(String contentLength) { + if (contentLength == null) { + return; + } + long parsed; + try { + parsed = Long.parseLong(contentLength); + } catch (NumberFormatException e) { + return; + } + if (parsed < 0) { + return; + } + if (parsed > limits.maxBodySize()) { + error(new ContentLengthExceededException(limits.maxBodySize(), parsed)); + } + setExpectedLength(parsed); + } + + public final void setExpectedLength(long length) { + if (length < 0) { + throw new IllegalArgumentException("Should be > 0"); + } + this.expectedLength = length; + } + + /** + * Reserve a spot for a future subscribe operation.
+ * Not thread safe, caller must handle concurrency. + */ + protected void reserve0() { + if (reserved == 0) { + throw new IllegalStateException("Cannot go from streaming state back to buffering state"); + } + reserved++; + } + + /** + * Forward any already-buffered data to the given new subscriber. + * + * @param subscriber The new subscriber, or {@code null} if the reservation has been cancelled + * and the data can just be discarded + * @param last {@code true} iff this was the last reservation and the buffer can be discarded + * after this call + */ + protected abstract void forwardInitialBuffer(@Nullable C subscriber, boolean last); + + /** + * Called after a subscribe operation. Used for leak detection. + * + * @param last {@code true} iff this was the last reservation + */ + protected void afterSubscribe(boolean last) { + } + + /** + * Called after {@link BufferConsumer#complete() completion} to get the data that should be + * forwarded to a {@link #subscribeFull0} subscriber. + * + * @param last {@code true} iff this was the last reservation and the buffer can be discarded + * after this call + * @return The full result that should be returned from the {@link #subscribeFull0} execution + * flow + */ + protected abstract F subscribeFullResult(boolean last); + + /** + * Add a subscriber. Must be preceded by a reservation.
+ * Not thread safe, caller must handle concurrency. + * + * @param subscriber The subscriber to add. Can be {@code null}, then the bytes will just be discarded + * @param specificUpstream The upstream for the subscriber. This is used to call allowDiscard if there was an error + */ + protected final void subscribe0(@Nullable C subscriber, BufferConsumer.Upstream specificUpstream) { + assert !working; + + if (reserved == 0) { + throw new IllegalStateException("Need to reserve a spot first"); + } + + working = true; + boolean last = --reserved == 0; + if (subscriber != null) { + if (subscribers == null) { + subscribers = new ArrayList<>(1); + } + subscribers.add(subscriber); + forwardInitialBuffer(subscriber, last); + if (error != null) { + subscriber.error(error); + } else if (lengthSoFar > limits.maxBufferSize()) { + subscriber.error(new BufferLengthExceededException(limits.maxBufferSize(), lengthSoFar)); + specificUpstream.allowDiscard(); + } + if (complete) { + subscriber.complete(); + } + } else { + forwardInitialBuffer(null, last); + } + afterSubscribe(last); + working = false; + } + + /** + * Optimized version of {@link #subscribe0} for subscribers that want to buffer the full + * body. The returned flow will complete when the + * input is buffered. The returned flow will always be identical to the {@code targetFlow} + * parameter IF {@code canReturnImmediate} is false. If {@code canReturnImmediate} is true, + * this method will SOMETIMES return an immediate ExecutionFlow instead as an optimization. + * + * @param targetFlow The delayed flow to use if {@code canReturnImmediate} is false and/or + * we have to wait for the result + * @param specificUpstream The upstream for the subscriber. This is used to call allowDiscard if there was an error + * @param canReturnImmediate Whether we can return an immediate ExecutionFlow instead of + * {@code targetFlow}, when appropriate + * @return A flow that will complete when all data has arrived, with a buffer containing that data + */ + protected final ExecutionFlow subscribeFull0(DelayedExecutionFlow targetFlow, BufferConsumer.Upstream specificUpstream, boolean canReturnImmediate) { + assert !working; + + if (reserved <= 0) { + throw new IllegalStateException("Need to reserve a spot first. This should not happen, StreamingNettyByteBody should guard against it"); + } + + ExecutionFlow ret = targetFlow; + + working = true; + boolean last = --reserved == 0; + Throwable error = this.error; + if (error == null && lengthSoFar > limits.maxBufferSize()) { + error = new BufferLengthExceededException(limits.maxBufferSize(), lengthSoFar); + specificUpstream.allowDiscard(); + } + if (error != null) { + if (canReturnImmediate) { + ret = ExecutionFlow.error(error); + } else { + targetFlow.completeExceptionally(error); + } + } else if (complete) { + F buf = subscribeFullResult(last); + if (canReturnImmediate) { + ret = ExecutionFlow.just(buf); + } else { + targetFlow.complete(buf); + } + } else { + if (fullSubscribers == null) { + fullSubscribers = new ArrayList<>(1); + } + fullSubscribers.add(targetFlow); + } + afterSubscribe(last); + working = false; + + return ret; + } + + /** + * Forward the input buffer to the given list of consumers. + * + * @param consumers The consumers to forward the data to + * @see #add(int) + */ + protected abstract void addForward(List consumers); + + /** + * Do not buffer the input buffer. + * + * @see #add(int) + */ + protected void addDoNotBuffer() { + } + + /** + * Buffer the input buffer. + * + * @see #add(int) + */ + protected abstract void addBuffer(); + + /** + * Discard the previously buffered bytes. + * + * @see #add(int) + */ + protected abstract void discardBuffer(); + + /** + * This method implements the {@link BufferConsumer} {@code add} logic in a + * buffer-type-independent way. It is not thread-safe: The subclass must take care of + * concurrency. The caller should store the actual buffer to be added in a field, then call + * this method, and use the stored buffer in the calls this method makes to + * {@link #addBuffer()}, {@link #addDoNotBuffer()} and {@link #addBuffer()}. After this method + * completes, the field can be cleared and those methods will not be called again. Example: + * + *

+     * {@code
+     *   ByteBuf adding;
+     *
+     *   public void add(ByteBuf buf) {
+     *       this.adding = buf;
+     *       add(buf.readableBytes());
+     *       this.adding = null;
+     *   }
+     *
+     *   @Override
+     *   protected void addForward(List consumers) {
+     *      for (ByteBufConsumer c : consumers) {
+     *          c.add(this.adding);
+     *      }
+     *   }
+     * }
+     * 
+ * + * @param n The number of bytes to add + */ + protected final void add(int n) { + assert !working; + + // calculate the new total length + long newLength = lengthSoFar + n; + long expectedLength = this.expectedLength; + if (expectedLength != -1 && newLength > expectedLength) { + throw new IllegalStateException("Received more bytes than specified by Content-Length"); + } + lengthSoFar = newLength; + + // drop messages if we're done with all subscribers + if (complete || error != null) { + addDoNotBuffer(); + return; + } + if (newLength > limits.maxBodySize()) { + // for maxBodySize, all subscribers get the error + addDoNotBuffer(); + error(new ContentLengthExceededException(limits.maxBodySize(), newLength)); + rootUpstream.allowDiscard(); + return; + } + + working = true; + if (subscribers != null) { + addForward(subscribers); + } + if (reserved > 0 || fullSubscribers != null) { + if (newLength > limits.maxBufferSize()) { + // new subscribers will recognize that the limit has been exceeded. Streaming + // subscribers can proceed normally. Need to notify buffering subscribers + addDoNotBuffer(); + discardBuffer(); + if (fullSubscribers != null) { + Exception e = new BufferLengthExceededException(limits.maxBufferSize(), lengthSoFar); + for (DelayedExecutionFlow fullSubscriber : fullSubscribers) { + fullSubscriber.completeExceptionally(e); + } + } + } else { + addBuffer(); + } + } else { + addDoNotBuffer(); + } + working = false; + } + + /** + * Implementation of {@link BufferConsumer#complete()}.
+ * Not thread safe, caller must handle concurrency. + */ + public void complete() { + if (expectedLength > lengthSoFar) { + throw new IllegalStateException("Received fewer bytes than specified by Content-Length"); + } + complete = true; + expectedLength = lengthSoFar; + if (subscribers != null) { + for (BufferConsumer subscriber : subscribers) { + subscriber.complete(); + } + } + if (fullSubscribers != null) { + boolean last = reserved <= 0; + for (Iterator> iterator = fullSubscribers.iterator(); iterator.hasNext(); ) { + DelayedExecutionFlow fullSubscriber = iterator.next(); + fullSubscriber.complete(subscribeFullResult(last && !iterator.hasNext())); + } + } + } + + /** + * Implementation of {@link BufferConsumer#error(Throwable)}.
+ * Not thread safe, caller must handle concurrency. + * + * @param e The error + */ + public void error(Throwable e) { + error = e; + discardBuffer(); + if (subscribers != null) { + for (BufferConsumer subscriber : subscribers) { + subscriber.error(e); + } + } + if (fullSubscribers != null) { + for (DelayedExecutionFlow fullSubscriber : fullSubscribers) { + fullSubscriber.completeExceptionally(e); + } + } + } + + /** + * {@link BufferConsumer} that can subscribe to a {@link BaseSharedBuffer} and return the + * buffer as a {@link Flux}. Used to implement {@link ByteBody#toByteBufferPublisher()} and + * similar methods.
+ * Subclass implements the specific {@link BufferConsumer} {@code add} method and + * {@link #size(Object)}. + * + * @param The buffer type + */ + public abstract static class AsFlux implements BufferConsumer { + private final BaseSharedBuffer sharedBuffer; + private final AtomicLong unconsumed = new AtomicLong(0); + private final Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); + + public AsFlux(BaseSharedBuffer sharedBuffer) { + this.sharedBuffer = sharedBuffer; + } + + protected abstract int size(B buf); + + public final boolean add0(B buf) { + long newLength = unconsumed.addAndGet(size(buf)); + if (newLength > sharedBuffer.getLimits().maxBufferSize()) { + sink.tryEmitError(new BufferLengthExceededException(sharedBuffer.getLimits().maxBufferSize(), newLength)); + return false; + } else { + return sink.tryEmitNext(buf) == Sinks.EmitResult.OK; + } + } + + @Override + public final void complete() { + sink.tryEmitComplete(); + } + + @Override + public final void error(Throwable e) { + sink.tryEmitError(e); + } + + public final Flux asFlux(Upstream upstream) { + return sink.asFlux() + .doOnSubscribe(s -> upstream.start()) + .doOnNext(bb -> { + int size = size(bb); + unconsumed.addAndGet(-size); + upstream.onBytesConsumed(size); + }) + .doOnCancel(() -> { + upstream.allowDiscard(); + upstream.disregardBackpressure(); + }); + } + } +} diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/BodySizeLimits.java b/http/src/main/java/io/micronaut/http/body/stream/BodySizeLimits.java similarity index 96% rename from http-netty/src/main/java/io/micronaut/http/netty/body/BodySizeLimits.java rename to http/src/main/java/io/micronaut/http/body/stream/BodySizeLimits.java index 3e33aa22be7..4f0bdb72627 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/BodySizeLimits.java +++ b/http/src/main/java/io/micronaut/http/body/stream/BodySizeLimits.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.http.netty.body; +package io.micronaut.http.body.stream; import io.micronaut.core.annotation.Internal; diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/BufferConsumer.java b/http/src/main/java/io/micronaut/http/body/stream/BufferConsumer.java similarity index 84% rename from http-netty/src/main/java/io/micronaut/http/netty/body/BufferConsumer.java rename to http/src/main/java/io/micronaut/http/body/stream/BufferConsumer.java index 406ad853d30..0cee66edc5d 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/BufferConsumer.java +++ b/http/src/main/java/io/micronaut/http/body/stream/BufferConsumer.java @@ -13,29 +13,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.http.netty.body; +package io.micronaut.http.body.stream; import io.micronaut.core.annotation.Internal; import io.micronaut.http.body.ByteBody; -import io.netty.buffer.ByteBuf; /** - * This is a netty-specific reactor-like API for streaming bytes. It's a bit better than reactor - * because it's more explicit about reference counting semantics, has more fine-grained controls - * for cancelling, and has more relaxed concurrency semantics. + * This is a reactor-like API for streaming bytes. It's a bit better than reactor because it's more + * explicit about reference counting semantics, has more fine-grained controls for cancelling, and + * has more relaxed concurrency semantics.
+ * This interface is buffer type agnostic. For specific buffer types (e.g. netty {@code ByteBuf}) + * there is a specific subinterface. * - * @since 4.5.0 + * @since 4.8.0 * @author Jonas Konrad */ @Internal public interface BufferConsumer { - /** - * Consume a buffer. Release ownership is transferred to this consumer. - * - * @param buf The buffer to consume - */ - void add(ByteBuf buf); - /** * Signal normal completion of the stream. */ diff --git a/http/src/main/java/io/micronaut/http/body/stream/InputStreamByteBody.java b/http/src/main/java/io/micronaut/http/body/stream/InputStreamByteBody.java index 2dafbdf3e3e..f76b8dd4858 100644 --- a/http/src/main/java/io/micronaut/http/body/stream/InputStreamByteBody.java +++ b/http/src/main/java/io/micronaut/http/body/stream/InputStreamByteBody.java @@ -53,10 +53,6 @@ private InputStreamByteBody(Context context, ExtendedInputStream stream) { this.stream = stream; } - static void failClaim() { - throw new IllegalStateException("Request body has already been claimed: Two conflicting sites are trying to access the request body. If this is intentional, the first user must ByteBody#split the body. To find out where the body was claimed, turn on TRACE logging for io.micronaut.http.server.netty.body.NettyByteBody."); - } - /** * Create a new stream-based {@link CloseableByteBody}. Ownership of the stream is transferred * to the returned body. @@ -78,6 +74,9 @@ public static CloseableByteBody create(@NonNull InputStream stream, @NonNull Opt @Override public @NonNull CloseableByteBody allowDiscard() { + if (stream == null) { + BaseSharedBuffer.failClaim(); + } stream.allowDiscard(); return this; } @@ -93,7 +92,7 @@ public void close() { @Override public @NonNull CloseableByteBody split(SplitBackpressureMode backpressureMode) { if (stream == null) { - failClaim(); + BaseSharedBuffer.failClaim(); } StreamPair.Pair pair = StreamPair.createStreamPair(stream, backpressureMode); stream = pair.left(); @@ -109,9 +108,10 @@ public void close() { public @NonNull ExtendedInputStream toInputStream() { ExtendedInputStream s = stream; if (s == null) { - failClaim(); + BaseSharedBuffer.failClaim(); } stream = null; + BaseSharedBuffer.logClaim(); return s; } diff --git a/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsBlocking.java b/http/src/main/java/io/micronaut/http/body/stream/PublisherAsBlocking.java similarity index 94% rename from http-netty/src/main/java/io/micronaut/http/netty/PublisherAsBlocking.java rename to http/src/main/java/io/micronaut/http/body/stream/PublisherAsBlocking.java index b32d14f8b7f..18139f81ea6 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/PublisherAsBlocking.java +++ b/http/src/main/java/io/micronaut/http/body/stream/PublisherAsBlocking.java @@ -13,11 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.http.netty; +package io.micronaut.http.body.stream; import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.Nullable; -import io.netty.util.ReferenceCountUtil; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.scheduler.NonBlocking; @@ -35,7 +34,7 @@ * @author Jonas Konrad */ @Internal -public final class PublisherAsBlocking implements Subscriber, Closeable { +public class PublisherAsBlocking implements Subscriber, Closeable { private final Lock lock = new ReentrantLock(); private final Condition newDataCondition = lock.newCondition(); /** @@ -64,6 +63,10 @@ public final class PublisherAsBlocking implements Subscriber, Closeable { */ private Throwable failure; + protected void release(T item) { + // optional resource management for subclasses + } + /** * The failure from {@link #onError(Throwable)}. When {@link #take()} returns {@code null}, this * may be set if the reactive stream ended in failure. @@ -96,7 +99,7 @@ public void onNext(T o) { lock.lock(); try { if (closed) { - ReferenceCountUtil.release(o); + release(o); return; } swap = o; @@ -111,7 +114,7 @@ public void onError(Throwable t) { lock.lock(); try { if (swap != null) { - ReferenceCountUtil.release(swap); + release(swap); swap = null; } failure = t; @@ -181,7 +184,7 @@ public void close() { try { closed = true; if (swap != null) { - ReferenceCountUtil.release(swap); + release(swap); swap = null; } } finally { diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/UpstreamBalancer.java b/http/src/main/java/io/micronaut/http/body/stream/UpstreamBalancer.java similarity index 92% rename from http-netty/src/main/java/io/micronaut/http/netty/body/UpstreamBalancer.java rename to http/src/main/java/io/micronaut/http/body/stream/UpstreamBalancer.java index 966711c8fb8..38596773155 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/UpstreamBalancer.java +++ b/http/src/main/java/io/micronaut/http/body/stream/UpstreamBalancer.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.micronaut.http.netty.body; +package io.micronaut.http.body.stream; import io.micronaut.core.annotation.Internal; import io.micronaut.http.body.ByteBody; @@ -46,7 +46,7 @@ * only need to test for the first case. */ @Internal -final class UpstreamBalancer { +public final class UpstreamBalancer { private static final AtomicLongFieldUpdater DELTA = AtomicLongFieldUpdater.newUpdater(UpstreamBalancer.class, "delta"); private static final AtomicIntegerFieldUpdater FLAGS = AtomicIntegerFieldUpdater.newUpdater(UpstreamBalancer.class, "flags"); @@ -72,16 +72,22 @@ private UpstreamBalancer(BufferConsumer.Upstream upstream) { /** * Implementation of {@link io.micronaut.http.body.ByteBody.SplitBackpressureMode#SLOWEST}. + * + * @param upstream The original upstream + * @return The balanced upstreams */ - static UpstreamPair slowest(BufferConsumer.Upstream upstream) { + public static UpstreamPair slowest(BufferConsumer.Upstream upstream) { UpstreamBalancer balancer = new UpstreamBalancer(upstream); return new UpstreamPair(balancer.new SlowestUpstreamImpl(false), balancer.new SlowestUpstreamImpl(true)); } /** * Implementation of {@link io.micronaut.http.body.ByteBody.SplitBackpressureMode#FASTEST}. + * + * @param upstream The original upstream + * @return The balanced upstreams */ - static UpstreamPair fastest(BufferConsumer.Upstream upstream) { + public static UpstreamPair fastest(BufferConsumer.Upstream upstream) { UpstreamBalancer balancer = new UpstreamBalancer(upstream); return new UpstreamPair(balancer.new FastestUpstreamImpl(false), balancer.new FastestUpstreamImpl(true)); } @@ -89,8 +95,11 @@ static UpstreamPair fastest(BufferConsumer.Upstream upstream) { /** * Implementation of {@link io.micronaut.http.body.ByteBody.SplitBackpressureMode#ORIGINAL} and * {@link io.micronaut.http.body.ByteBody.SplitBackpressureMode#NEW}. + * + * @param upstream The original upstream + * @return The balanced upstreams */ - static UpstreamPair first(BufferConsumer.Upstream upstream) { + public static UpstreamPair first(BufferConsumer.Upstream upstream) { UpstreamBalancer balancer = new UpstreamBalancer(upstream); return new UpstreamPair(balancer.new PassthroughUpstreamImpl(), balancer.new IgnoringUpstreamImpl()); } @@ -99,8 +108,12 @@ static UpstreamPair first(BufferConsumer.Upstream upstream) { * Create a pair of {@link BufferConsumer.Upstream} * instances that delegates to the given {@code upstream} according to the semantics of the * given {@code mode}. + * + * @param upstream The original upstream + * @param mode The balancing mode + * @return The balanced upstreams */ - static UpstreamPair balancer(BufferConsumer.Upstream upstream, ByteBody.SplitBackpressureMode mode) { + public static UpstreamPair balancer(BufferConsumer.Upstream upstream, ByteBody.SplitBackpressureMode mode) { return switch (mode) { case SLOWEST -> slowest(upstream); case FASTEST -> fastest(upstream);