diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java b/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java index 25b335eae0bf1..3af4227bf46ca 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkRequest.java @@ -287,7 +287,7 @@ public BulkRequest add( String routing = valueOrDefault(defaultRouting, globalRouting); String pipeline = valueOrDefault(defaultPipeline, globalPipeline); Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias); - new BulkRequestParser(true).parse( + new BulkRequestParser().parse( data, defaultIndex, routing, @@ -296,7 +296,7 @@ public BulkRequest add( requireAlias, allowExplicitIndex, xContentType, - (indexRequest, type) -> internalAdd(indexRequest), + this::internalAdd, this::internalAdd, this::add ); diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java index 675905cc60e75..212450515b57e 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkRequestParser.java @@ -53,7 +53,6 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -67,7 +66,6 @@ public final class BulkRequestParser { private static final ParseField INDEX = new ParseField("_index"); - private static final ParseField TYPE = new ParseField("_type"); private static final ParseField ID = new ParseField("_id"); private static final ParseField ROUTING = new ParseField("routing"); private static final ParseField OP_TYPE = new ParseField("op_type"); @@ -80,17 +78,6 @@ public final class BulkRequestParser { private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term"); private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS); - // TODO: Remove this parameter once the BulkMonitoring endpoint has been removed - private final boolean errorOnType; - - /** - * Create a new parser. - * @param errorOnType whether to allow _type information in the index line; used by BulkMonitoring - */ - public BulkRequestParser(boolean errorOnType) { - this.errorOnType = errorOnType; - } - private static int findNextMarker(byte marker, int from, BytesReference data) { final int res = data.indexOf(marker, from); if (res != -1) { @@ -136,7 +123,7 @@ public void parse( @Nullable Boolean defaultRequireAlias, boolean allowExplicitIndex, XContentType xContentType, - BiConsumer indexRequestConsumer, + Consumer indexRequestConsumer, Consumer updateRequestConsumer, Consumer deleteRequestConsumer ) throws IOException { @@ -192,7 +179,6 @@ public void parse( String action = parser.currentName(); String index = defaultIndex; - String type = null; String id = null; String routing = defaultRouting; FetchSourceContext fetchSourceContext = defaultFetchSourceContext; @@ -205,7 +191,7 @@ public void parse( String pipeline = defaultPipeline; boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias; - // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) + // at this stage, next token can either be END_OBJECT (and use default index with auto generated id) // or START_OBJECT which will have another set of parameters token = parser.nextToken(); @@ -220,13 +206,6 @@ public void parse( throw new IllegalArgumentException("explicit index in bulk is not allowed"); } index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); - } else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) { - if (errorOnType) { - throw new IllegalArgumentException( - "Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]" - ); - } - type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); } else if (ID.match(currentFieldName, parser.getDeprecationHandler())) { id = parser.text(); } else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) { @@ -322,8 +301,7 @@ public void parse( .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) - .setRequireAlias(requireAlias), - type + .setRequireAlias(requireAlias) ); } else { indexRequestConsumer.accept( @@ -336,8 +314,7 @@ public void parse( .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) - .setRequireAlias(requireAlias), - type + .setRequireAlias(requireAlias) ); } } else if ("create".equals(action)) { @@ -351,8 +328,7 @@ public void parse( .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) - .setRequireAlias(requireAlias), - type + .setRequireAlias(requireAlias) ); } else if ("update".equals(action)) { if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) { diff --git a/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java b/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java index 239bb19c5f6ad..d3da77112408b 100644 --- a/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/BulkRequestParserTests.java @@ -47,9 +47,9 @@ public class BulkRequestParserTests extends OpenSearchTestCase { public void testIndexRequest() throws IOException { BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}\n"); - BulkRequestParser parser = new BulkRequestParser(randomBoolean()); + BulkRequestParser parser = new BulkRequestParser(); final AtomicBoolean parsed = new AtomicBoolean(); - parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> { + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, indexRequest -> { assertFalse(parsed.get()); assertEquals("foo", indexRequest.index()); assertEquals("bar", indexRequest.id()); @@ -67,7 +67,7 @@ public void testIndexRequest() throws IOException { true, false, XContentType.JSON, - (indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); }, + indexRequest -> { assertTrue(indexRequest.isRequireAlias()); }, req -> fail(), req -> fail() ); @@ -82,7 +82,7 @@ public void testIndexRequest() throws IOException { null, false, XContentType.JSON, - (indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); }, + indexRequest -> { assertTrue(indexRequest.isRequireAlias()); }, req -> fail(), req -> fail() ); @@ -97,7 +97,7 @@ public void testIndexRequest() throws IOException { true, false, XContentType.JSON, - (indexRequest, type) -> { assertFalse(indexRequest.isRequireAlias()); }, + indexRequest -> { assertFalse(indexRequest.isRequireAlias()); }, req -> fail(), req -> fail() ); @@ -105,34 +105,22 @@ public void testIndexRequest() throws IOException { public void testDeleteRequest() throws IOException { BytesArray request = new BytesArray("{ \"delete\":{ \"_id\": \"bar\" } }\n"); - BulkRequestParser parser = new BulkRequestParser(randomBoolean()); + BulkRequestParser parser = new BulkRequestParser(); final AtomicBoolean parsed = new AtomicBoolean(); - parser.parse( - request, - "foo", - null, - null, - null, - null, - false, - XContentType.JSON, - (req, type) -> fail(), - req -> fail(), - deleteRequest -> { - assertFalse(parsed.get()); - assertEquals("foo", deleteRequest.index()); - assertEquals("bar", deleteRequest.id()); - parsed.set(true); - } - ); + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, req -> fail(), req -> fail(), deleteRequest -> { + assertFalse(parsed.get()); + assertEquals("foo", deleteRequest.index()); + assertEquals("bar", deleteRequest.id()); + parsed.set(true); + }); assertTrue(parsed.get()); } public void testUpdateRequest() throws IOException { BytesArray request = new BytesArray("{ \"update\":{ \"_id\": \"bar\" } }\n{}\n"); - BulkRequestParser parser = new BulkRequestParser(randomBoolean()); + BulkRequestParser parser = new BulkRequestParser(); final AtomicBoolean parsed = new AtomicBoolean(); - parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> { + parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, req -> fail(), updateRequest -> { assertFalse(parsed.get()); assertEquals("foo", updateRequest.index()); assertEquals("bar", updateRequest.id()); @@ -150,7 +138,7 @@ public void testUpdateRequest() throws IOException { true, false, XContentType.JSON, - (req, type) -> fail(), + req -> fail(), updateRequest -> { assertTrue(updateRequest.isRequireAlias()); }, req -> fail() ); @@ -165,7 +153,7 @@ public void testUpdateRequest() throws IOException { null, false, XContentType.JSON, - (req, type) -> fail(), + req -> fail(), updateRequest -> { assertTrue(updateRequest.isRequireAlias()); }, req -> fail() ); @@ -180,7 +168,7 @@ public void testUpdateRequest() throws IOException { true, false, XContentType.JSON, - (req, type) -> fail(), + req -> fail(), updateRequest -> { assertFalse(updateRequest.isRequireAlias()); }, req -> fail() ); @@ -188,7 +176,7 @@ public void testUpdateRequest() throws IOException { public void testBarfOnLackOfTrailingNewline() { BytesArray request = new BytesArray("{ \"index\":{ \"_id\": \"bar\" } }\n{}"); - BulkRequestParser parser = new BulkRequestParser(randomBoolean()); + BulkRequestParser parser = new BulkRequestParser(); IllegalArgumentException e = expectThrows( IllegalArgumentException.class, () -> parser.parse( @@ -200,7 +188,7 @@ public void testBarfOnLackOfTrailingNewline() { null, false, XContentType.JSON, - (indexRequest, type) -> fail(), + indexRequest -> fail(), req -> fail(), req -> fail() ) @@ -210,46 +198,21 @@ public void testBarfOnLackOfTrailingNewline() { public void testFailOnExplicitIndex() { BytesArray request = new BytesArray("{ \"index\":{ \"_index\": \"foo\", \"_id\": \"bar\" } }\n{}\n"); - BulkRequestParser parser = new BulkRequestParser(randomBoolean()); + BulkRequestParser parser = new BulkRequestParser(); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, - () -> parser.parse( - request, - null, - null, - null, - null, - null, - false, - XContentType.JSON, - (req, type) -> fail(), - req -> fail(), - req -> fail() - ) + () -> parser.parse(request, null, null, null, null, null, false, XContentType.JSON, req -> fail(), req -> fail(), req -> fail()) ); assertEquals("explicit index in bulk is not allowed", ex.getMessage()); } - public void testTypesStillParsedForBulkMonitoring() throws IOException { - BytesArray request = new BytesArray("{ \"index\":{ \"_type\": \"quux\", \"_id\": \"bar\" } }\n{}\n"); - BulkRequestParser parser = new BulkRequestParser(false); - final AtomicBoolean parsed = new AtomicBoolean(); - parser.parse(request, "foo", null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> { - assertFalse(parsed.get()); - assertEquals("foo", indexRequest.index()); - assertEquals("bar", indexRequest.id()); - parsed.set(true); - }, req -> fail(), req -> fail()); - assertTrue(parsed.get()); - } - public void testParseDeduplicatesParameterStrings() throws IOException { BytesArray request = new BytesArray( "{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\"} }\n{}\n" + "{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\" } }\n{}\n" ); - BulkRequestParser parser = new BulkRequestParser(randomBoolean()); + BulkRequestParser parser = new BulkRequestParser(); final List indexRequests = new ArrayList<>(); parser.parse( request, @@ -260,7 +223,7 @@ public void testParseDeduplicatesParameterStrings() throws IOException { null, true, XContentType.JSON, - (indexRequest, type) -> indexRequests.add(indexRequest), + indexRequest -> indexRequests.add(indexRequest), req -> fail(), req -> fail() );