diff --git a/CHANGELOG.md b/CHANGELOG.md index a4e920f3a202d..0648b2cb8a5bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423)) - Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478)) - Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293)) +- [Streaming Indexing] Enhance RestAction with request / response streaming support ([#13772](https://github.com/opensearch-project/OpenSearch/pull/13772)) ### Dependencies - Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559)) diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/HttpContentSender.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/HttpContentSender.java new file mode 100644 index 0000000000000..df8d5a3e6b441 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/HttpContentSender.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.core.action.ActionListener; + +import io.netty.handler.codec.http.HttpContent; +import org.reactivestreams.Publisher; + +/** + * The generic interface for chunked {@link HttpContent} producers (response streaming). + */ +interface HttpContentSender extends Publisher { + void send(HttpContent content, ActionListener listener, boolean isLast); +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java new file mode 100644 index 0000000000000..d9c669ffea056 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.http.HttpChunk; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import java.util.concurrent.atomic.AtomicBoolean; + +import io.netty.buffer.ByteBuf; + +class ReactorNetty4HttpChunk implements HttpChunk { + private final AtomicBoolean released; + private final boolean pooled; + private final ByteBuf content; + private final boolean last; + + ReactorNetty4HttpChunk(ByteBuf content, boolean last) { + this(new AtomicBoolean(false), true, content, last); + } + + private ReactorNetty4HttpChunk(AtomicBoolean released, boolean pooled, ByteBuf content, boolean last) { + this.content = content; + this.pooled = pooled; + this.released = released; + this.last = last; + } + + @Override + public BytesReference content() { + assert released.get() == false; + return Netty4Utils.toBytesReference(content); + } + + @Override + public void release() { + if (pooled && released.compareAndSet(false, true)) { + content.release(); + } + } + + @Override + public boolean isLast() { + return last; + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java index bd1646d753016..bae3f068ee4a1 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransport.java @@ -26,6 +26,8 @@ import org.opensearch.http.HttpServerChannel; import org.opensearch.http.reactor.netty4.ssl.SslUtils; import org.opensearch.plugins.SecureHttpTransportSettingsProvider; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest.Method; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.reactor.SharedGroupFactory; @@ -351,24 +353,42 @@ public List protocols() { * @return response publisher */ protected Publisher incomingRequest(HttpServerRequest request, HttpServerResponse response) { - final NonStreamingRequestConsumer consumer = new NonStreamingRequestConsumer<>( - this, - request, - response, - maxCompositeBufferComponents - ); + final Method method = HttpConversionUtil.convertMethod(request.method()); + if (dispatcher.dispatchHandler(request.uri(), request.fullPath(), method, request.params()) + .map(RestHandler::supportsStreaming) + .orElse(false)) { + final ReactorNetty4StreamingRequestConsumer consumer = new ReactorNetty4StreamingRequestConsumer<>( + this, + request, + response + ); - request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer); - - return Mono.from(consumer).flatMap(hc -> { - final FullHttpResponse r = (FullHttpResponse) hc; - response.status(r.status()); - response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue()))); - response.chunkedTransfer(false); - response.compression(true); - r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue())); - return Mono.from(response.sendObject(r.content())); - }); + request.receiveContent() + .switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)) + .subscribe(consumer, error -> {}, () -> consumer.accept(DefaultLastHttpContent.EMPTY_LAST_CONTENT)); + consumer.start(); + + return response.sendObject(consumer); + } else { + final ReactorNetty4NonStreamingRequestConsumer consumer = new ReactorNetty4NonStreamingRequestConsumer<>( + this, + request, + response, + maxCompositeBufferComponents + ); + + request.receiveContent().switchIfEmpty(Mono.just(DefaultLastHttpContent.EMPTY_LAST_CONTENT)).subscribe(consumer); + + return Mono.from(consumer).flatMap(hc -> { + final FullHttpResponse r = (FullHttpResponse) hc; + response.status(r.status()); + response.trailerHeaders(c -> r.trailingHeaders().forEach(h -> c.add(h.getKey(), h.getValue()))); + response.chunkedTransfer(false); + response.compression(true); + r.headers().forEach(h -> response.addHeader(h.getKey(), h.getValue())); + return Mono.from(response.sendObject(r.content())); + }); + } } /** diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java similarity index 92% rename from plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java rename to plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java index 98b359319ff1b..7df0b3c0c35fe 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingHttpChannel.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingHttpChannel.java @@ -23,13 +23,13 @@ import reactor.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerResponse; -class NonStreamingHttpChannel implements HttpChannel { +class ReactorNetty4NonStreamingHttpChannel implements HttpChannel { private final HttpServerRequest request; private final HttpServerResponse response; private final CompletableContext closeContext = new CompletableContext<>(); private final FluxSink emitter; - NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink emitter) { + ReactorNetty4NonStreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, FluxSink emitter) { this.request = request; this.response = response; this.emitter = emitter; diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java similarity index 89% rename from plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingRequestConsumer.java rename to plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java index d43e23e800e65..c09e7755b1670 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/NonStreamingRequestConsumer.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4NonStreamingRequestConsumer.java @@ -25,7 +25,7 @@ import reactor.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerResponse; -class NonStreamingRequestConsumer implements Consumer, Publisher, Disposable { +class ReactorNetty4NonStreamingRequestConsumer implements Consumer, Publisher, Disposable { private final HttpServerRequest request; private final HttpServerResponse response; private final CompositeByteBuf content; @@ -34,7 +34,7 @@ class NonStreamingRequestConsumer implements Consumer, private final AtomicBoolean disposed = new AtomicBoolean(false); private volatile FluxSink emitter; - NonStreamingRequestConsumer( + ReactorNetty4NonStreamingRequestConsumer( AbstractHttpServerTransport transport, HttpServerRequest request, HttpServerResponse response, @@ -64,12 +64,12 @@ public void accept(T message) { } } - public void process(HttpContent in, FluxSink emitter) { + void process(HttpContent in, FluxSink emitter) { // Consume request body in full before dispatching it content.addComponent(true, in.content().retain()); if (in instanceof LastHttpContent) { - final NonStreamingHttpChannel channel = new NonStreamingHttpChannel(request, response, emitter); + final ReactorNetty4NonStreamingHttpChannel channel = new ReactorNetty4NonStreamingHttpChannel(request, response, emitter); final HttpRequest r = createRequest(request, content); try { diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java new file mode 100644 index 0000000000000..ee07307345fd8 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java @@ -0,0 +1,118 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.common.concurrent.CompletableContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.http.HttpChunk; +import org.opensearch.http.HttpResponse; +import org.opensearch.http.StreamingHttpChannel; +import org.opensearch.transport.reactor.netty4.Netty4Utils; + +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; + +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpContent; +import org.reactivestreams.FlowAdapters; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; + +class ReactorNetty4StreamingHttpChannel implements StreamingHttpChannel { + private final HttpServerRequest request; + private final HttpServerResponse response; + private final CompletableContext closeContext = new CompletableContext<>(); + private final Publisher receiver; + private final HttpContentSender sender; + private volatile FluxSink producer; + private volatile boolean lastChunkReceived = false; + + ReactorNetty4StreamingHttpChannel(HttpServerRequest request, HttpServerResponse response, HttpContentSender sender) { + this.request = request; + this.response = response; + this.sender = sender; + this.receiver = Flux.create(producer -> this.producer = producer); + this.request.withConnection(connection -> Netty4Utils.addListener(connection.channel().closeFuture(), closeContext)); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() { + request.withConnection(connection -> connection.channel().close()); + } + + @Override + public void addCloseListener(ActionListener listener) { + closeContext.addListener(ActionListener.toBiConsumer(listener)); + } + + @Override + public void sendChunk(HttpChunk chunk, ActionListener listener) { + sender.send(createContent(chunk), listener, chunk.isLast()); + } + + @Override + public void sendResponse(HttpResponse response, ActionListener listener) { + sender.send(createContent(response), listener, true); + } + + @Override + public void prepareResponse(int status, Map> headers) { + this.response.status(status); + headers.forEach((k, vs) -> vs.forEach(v -> this.response.addHeader(k, v))); + } + + @Override + public InetSocketAddress getRemoteAddress() { + return (InetSocketAddress) response.remoteAddress(); + } + + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) response.hostAddress(); + } + + @Override + public void receiveChunk(HttpChunk message) { + if (lastChunkReceived) { + return; + } + + producer.next(message); + if (message.isLast()) { + lastChunkReceived = true; + producer.complete(); + } + } + + @Override + public void subscribe(java.util.concurrent.Flow.Subscriber s) { + receiver.subscribe(FlowAdapters.toSubscriber(s)); + } + + private static HttpContent createContent(HttpResponse response) { + final FullHttpResponse fullHttpResponse = (FullHttpResponse) response; + return new DefaultHttpContent(fullHttpResponse.content()); + } + + private static HttpContent createContent(HttpChunk chunk) { + return new DefaultHttpContent(Unpooled.copiedBuffer(BytesReference.toBytes(chunk.content()))); + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java new file mode 100644 index 0000000000000..4e86b7d9d3173 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.http.AbstractHttpServerTransport; +import org.opensearch.http.HttpChunk; +import org.opensearch.http.HttpRequest; +import org.opensearch.http.StreamingHttpChannel; + +import java.util.function.Consumer; + +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.LastHttpContent; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.netty.http.server.HttpServerRequest; +import reactor.netty.http.server.HttpServerResponse; + +class ReactorNetty4StreamingRequestConsumer implements Consumer, Publisher { + private final AbstractHttpServerTransport transport; + private final HttpServerRequest request; + private final HttpContentSender sender; + private final StreamingHttpChannel httpChannel; + + ReactorNetty4StreamingRequestConsumer(AbstractHttpServerTransport transport, HttpServerRequest request, HttpServerResponse response) { + this.transport = transport; + this.request = request; + this.sender = new ReactorNetty4StreamingResponseProducer(); + this.httpChannel = new ReactorNetty4StreamingHttpChannel(request, response, sender); + } + + @Override + public void accept(T message) { + if (message instanceof LastHttpContent) { + httpChannel.receiveChunk(createChunk(message, true)); + } else if (message instanceof HttpContent) { + httpChannel.receiveChunk(createChunk(message, false)); + } + } + + @Override + public void subscribe(Subscriber s) { + sender.subscribe(s); + } + + void start() { + transport.incomingStream(createRequest(request), httpChannel); + } + + HttpRequest createRequest(HttpServerRequest request) { + return new ReactorNetty4HttpRequest(request, Unpooled.EMPTY_BUFFER); + } + + HttpChunk createChunk(HttpContent chunk, boolean last) { + return new ReactorNetty4HttpChunk(chunk.content(), last); + } +} diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java new file mode 100644 index 0000000000000..b636f3a3c6ac9 --- /dev/null +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingResponseProducer.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http.reactor.netty4; + +import org.opensearch.core.action.ActionListener; + +import io.netty.handler.codec.http.HttpContent; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + +class ReactorNetty4StreamingResponseProducer implements HttpContentSender { + private final Publisher sender; + private volatile FluxSink emitter; + + ReactorNetty4StreamingResponseProducer() { + this.sender = Flux.create(emitter -> this.emitter = emitter); + } + + @Override + public void send(HttpContent content, ActionListener listener, boolean isLast) { + try { + emitter.next(content); + listener.onResponse(null); + if (isLast) { + emitter.complete(); + } + } catch (final Exception ex) { + emitter.error(ex); + listener.onFailure(ex); + } + } + + @Override + public void subscribe(Subscriber s) { + sender.subscribe(s); + } +} diff --git a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java index 257aca2b67990..cd472d8e7e06f 100644 --- a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java @@ -357,6 +357,16 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) { logger.trace(() -> new ParameterizedMessage("Http channel accepted: {}", httpChannel)); } + /** + * This method handles an incoming http request as a stream. + * + * @param httpRequest that is incoming + * @param httpChannel that received the http request + */ + public void incomingStream(HttpRequest httpRequest, final StreamingHttpChannel httpChannel) { + handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException()); + } + /** * This method handles an incoming http request. * diff --git a/server/src/main/java/org/opensearch/http/HttpChunk.java b/server/src/main/java/org/opensearch/http/HttpChunk.java new file mode 100644 index 0000000000000..32438a21b799d --- /dev/null +++ b/server/src/main/java/org/opensearch/http/HttpChunk.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.bytes.BytesReference; + +/** + * Represents a chunk of the HTTP request / response stream + */ +@PublicApi(since = "2.15.0") +public interface HttpChunk { + /** + * Signals this is the last chunk of the stream. + * @return "true" if this is the last chunk of the stream, "false" otherwise + */ + boolean isLast(); + + /** + * Returns the content of this chunk + * @return the content of this chunk + */ + BytesReference content(); + + /** + * Releases all possible resources associated with this chunk + */ + void release(); +} diff --git a/server/src/main/java/org/opensearch/http/HttpServerTransport.java b/server/src/main/java/org/opensearch/http/HttpServerTransport.java index 012b69c29c1d4..f58d604151fd0 100644 --- a/server/src/main/java/org/opensearch/http/HttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/HttpServerTransport.java @@ -38,8 +38,12 @@ import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.service.ReportingService; import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestHandler; import org.opensearch.rest.RestRequest; +import java.util.Map; +import java.util.Optional; + /** * HTTP Transport server * @@ -61,6 +65,17 @@ public interface HttpServerTransport extends LifecycleComponent, ReportingServic * Dispatches HTTP requests. */ interface Dispatcher { + /** + * Finds the matching {@link RestHandler} that the request is going to be dispatched to, if any. + * @param uri request URI + * @param rawPath request raw path + * @param method request HTTP method + * @param params request parameters + * @return matching {@link RestHandler} that the request is going to be dispatched to, {@code Optional.empty()} if none match + */ + default Optional dispatchHandler(String uri, String rawPath, RestRequest.Method method, Map params) { + return Optional.empty(); + } /** * Dispatches the {@link RestRequest} to the relevant request handler or responds to the given rest channel directly if diff --git a/server/src/main/java/org/opensearch/http/StreamingHttpChannel.java b/server/src/main/java/org/opensearch/http/StreamingHttpChannel.java new file mode 100644 index 0000000000000..6258bf3f250a3 --- /dev/null +++ b/server/src/main/java/org/opensearch/http/StreamingHttpChannel.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.http; + +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.action.ActionListener; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Flow.Publisher; + +/** + * Represents an HTTP communication channel with streaming capabiltiies. + * + * @opensearch.api + */ +@PublicApi(since = "2.15.0") +public interface StreamingHttpChannel extends HttpChannel, Publisher { + /** + * Sends the next {@link HttpChunk} to the response stream + * @param chunk response chunk to send to channel + */ + void sendChunk(HttpChunk chunk, ActionListener listener); + + /** + * Receives the next {@link HttpChunk} from the request stream + * @param chunk next {@link HttpChunk} + */ + void receiveChunk(HttpChunk chunk); + + /** + * Prepares response before kicking of content streaming + * @param status response status + * @param headers response headers + */ + void prepareResponse(int status, Map> headers); +} diff --git a/server/src/main/java/org/opensearch/rest/RestController.java b/server/src/main/java/org/opensearch/rest/RestController.java index 95abb9b3daeca..7842e478f1062 100644 --- a/server/src/main/java/org/opensearch/rest/RestController.java +++ b/server/src/main/java/org/opensearch/rest/RestController.java @@ -71,6 +71,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -257,6 +258,32 @@ public void registerHandler(final RestHandler restHandler) { ); } + @Override + public Optional dispatchHandler(String uri, String rawPath, RestRequest.Method method, Map params) { + // Loop through all possible handlers, attempting to dispatch the request + final Iterator allHandlers = getAllRestMethodHandlers(params, rawPath); + + while (allHandlers.hasNext()) { + final RestHandler handler; + final RestMethodHandlers handlers = allHandlers.next(); + if (handlers == null) { + handler = null; + } else { + handler = handlers.getHandler(method); + } + if (handler == null) { + final Set validMethodSet = getValidHandlerMethodSet(rawPath); + if (validMethodSet.contains(method) == false) { + return Optional.empty(); + } + } else { + return Optional.of(handler); + } + } + + return Optional.empty(); + } + @Override public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { try { diff --git a/server/src/main/java/org/opensearch/rest/RestHandler.java b/server/src/main/java/org/opensearch/rest/RestHandler.java index 877afdd951088..1139e5fc65f31 100644 --- a/server/src/main/java/org/opensearch/rest/RestHandler.java +++ b/server/src/main/java/org/opensearch/rest/RestHandler.java @@ -72,6 +72,14 @@ default boolean supportsContentStream() { return false; } + /** + * Indicates if the RestHandler supports request / response streaming. Please note that the transport engine has to support + * streaming as well. + */ + default boolean supportsStreaming() { + return false; + } + /** * Indicates if the RestHandler supports working with pooled buffers. If the request handler will not escape the return * {@link RestRequest#content()} or any buffers extracted from it then there is no need to make a copies of any pooled buffers in the