diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java index 7d478f6173..efdd62e07e 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java @@ -7,9 +7,34 @@ import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.dataprepper.model.event.EventHandle; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + import static com.google.common.base.Preconditions.checkNotNull; public class BulkOperationWrapper { + private static final Predicate IS_INDEX_OPERATION = BulkOperation::isIndex; + private static final Predicate IS_CREATE_OPERATION = BulkOperation::isCreate; + + private static final Map, Function> BULK_OPERATION_TO_DOCUMENT_CONVERTERS = Map.of( + IS_INDEX_OPERATION, operation -> operation.index().document(), + IS_CREATE_OPERATION, operation -> operation.create().document() + ); + + private static final Map, Function> BULK_OPERATION_TO_INDEX_NAME_CONVERTERS = Map.of( + IS_INDEX_OPERATION, operation -> operation.index().index(), + IS_CREATE_OPERATION, operation -> operation.create().index() + ); + + private static final Map, Function> BULK_OPERATION_TO_ID_CONVERTERS = Map.of( + IS_INDEX_OPERATION, operation -> operation.index().id(), + IS_CREATE_OPERATION, operation -> operation.create().id() + ); + private final EventHandle eventHandle; private final BulkOperation bulkOperation; @@ -37,4 +62,29 @@ public void releaseEventHandle(boolean result) { eventHandle.release(result); } } + + public Object getDocument() { + return getValueFromConverter(BULK_OPERATION_TO_DOCUMENT_CONVERTERS); + } + + public String getIndex() { + return getValueFromConverter(BULK_OPERATION_TO_INDEX_NAME_CONVERTERS); + } + + public String getId() { + return getValueFromConverter(BULK_OPERATION_TO_ID_CONVERTERS); + } + + private T getValueFromConverter(final Map, Function> converters) { + final List values = converters.entrySet().stream() + .filter(entry -> entry.getKey().test(bulkOperation)) + .map(entry -> entry.getValue().apply(bulkOperation)) + .collect(Collectors.toList()); + + if (values.size() != 1) { + throw new UnsupportedOperationException("Only index or create operations are supported currently." + bulkOperation); + } + + return values.get(0); + } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkOperationWriter.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkOperationWriter.java index 38e318f138..d2661029fc 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkOperationWriter.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkOperationWriter.java @@ -34,9 +34,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.dataprepper.model.failures.DlqObject; +import org.opensearch.dataprepper.plugins.sink.opensearch.BulkOperationWrapper; import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedDlqData; /** @@ -47,10 +47,10 @@ public class BulkOperationWriter { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static String bulkOperationToString(BulkOperation bulkOperation) { - String index = bulkOperation.index().index(); + public static String bulkOperationToString(BulkOperationWrapper bulkOperation) { + String index = bulkOperation.getIndex(); String source = extractDocumentSource(bulkOperation); - String id = bulkOperation.index().id(); + String id = bulkOperation.getId(); String sSource = "_na_"; try { @@ -86,8 +86,8 @@ private static String extractDocumentSource(final FailedDlqData failedData) { } } - private static String extractDocumentSource(BulkOperation bulkOperation) { - final SerializedJson document = (SerializedJson) bulkOperation.index().document(); + private static String extractDocumentSource(BulkOperationWrapper bulkOperation) { + final SerializedJson document = (SerializedJson) bulkOperation.getDocument(); return new String(document.getSerializedJson()); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingCompressedBulkRequest.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingCompressedBulkRequest.java index 24cdb5ed6d..40dc699a70 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingCompressedBulkRequest.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingCompressedBulkRequest.java @@ -126,15 +126,7 @@ private long estimateBulkSize() { } private Object mapBulkOperationToDocument(final BulkOperationWrapper bulkOperation) { - Object anyDocument; - - if (bulkOperation.getBulkOperation().isIndex()) { - anyDocument = bulkOperation.getBulkOperation().index().document(); - } else if (bulkOperation.getBulkOperation().isCreate()) { - anyDocument = bulkOperation.getBulkOperation().create().document(); - } else { - throw new UnsupportedOperationException("Only index or create operations are supported currently. " + bulkOperation); - } + final Object anyDocument = bulkOperation.getDocument(); if (anyDocument == null) { return new SerializedJsonImpl(null); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequest.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequest.java index 6775a51560..e8107e7e66 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequest.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequest.java @@ -69,16 +69,7 @@ public BulkRequest getRequest() { } private long estimateBulkOperationSize(BulkOperationWrapper bulkOperation) { - - Object anyDocument; - - if (bulkOperation.getBulkOperation().isIndex()) { - anyDocument = bulkOperation.getBulkOperation().index().document(); - } else if (bulkOperation.getBulkOperation().isCreate()) { - anyDocument = bulkOperation.getBulkOperation().create().document(); - } else { - throw new UnsupportedOperationException("Only index or create operations are supported currently. " + bulkOperation); - } + final Object anyDocument = bulkOperation.getDocument(); if (anyDocument == null) return OPERATION_OVERHEAD; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverter.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverter.java index b1ae41701b..bb53963e7e 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverter.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverter.java @@ -2,7 +2,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; -import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; import org.opensearch.dataprepper.model.failures.DlqObject; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; @@ -34,14 +33,13 @@ public FailedBulkOperationConverter(final String pipelineName, final String plug public DlqObject convertToDlqObject(final FailedBulkOperation failedBulkOperation) { final BulkOperationWrapper bulkOperationWithHandle = failedBulkOperation.getBulkOperation(); - final BulkOperation bulkOperation = bulkOperationWithHandle.getBulkOperation(); final BulkResponseItem bulkResponseItem = failedBulkOperation.getBulkResponseItem(); - final Object document = convertDocumentToGenericMap(bulkOperation); + final Object document = convertDocumentToGenericMap(bulkOperationWithHandle); final FailedDlqData.Builder failedDlqDataBuilder = FailedDlqData.builder() - .withIndex(bulkOperation.index().index()) - .withIndexId(bulkOperation.index().id()) + .withIndex(bulkOperationWithHandle.getIndex()) + .withIndexId(bulkOperationWithHandle.getId()) .withDocument(document); if (bulkResponseItem != null) { @@ -61,8 +59,8 @@ public DlqObject convertToDlqObject(final FailedBulkOperation failedBulkOperatio .build(); } - private Object convertDocumentToGenericMap(final BulkOperation bulkOperation) { - final SerializedJson document = (SerializedJson) bulkOperation.index().document(); + private Object convertDocumentToGenericMap(final BulkOperationWrapper bulkOperation) { + final SerializedJson document = (SerializedJson) bulkOperation.getDocument(); final byte[] documentBytes = document.getSerializedJson(); final String jsonString = new String(documentBytes, StandardCharsets.UTF_8); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapperTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapperTests.java index 2b20c86e86..4b03abe6d5 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapperTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapperTests.java @@ -5,18 +5,34 @@ package org.opensearch.dataprepper.plugins.sink.opensearch; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.client.opensearch.core.bulk.CreateOperation; +import org.opensearch.client.opensearch.core.bulk.DeleteOperation; +import org.opensearch.client.opensearch.core.bulk.IndexOperation; import org.opensearch.dataprepper.model.event.EventHandle; import org.junit.jupiter.api.Test; + +import java.util.Objects; +import java.util.UUID; +import java.util.stream.Stream; + import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; public class BulkOperationWrapperTests { + private static final String ID = UUID.randomUUID().toString(); + private static final String INDEX = UUID.randomUUID().toString(); + private static final String DOCUMENT = UUID.randomUUID().toString(); + private BulkOperation bulkOperation; - BulkOperationWrapper createObjectUnderTest(EventHandle eventHandle) { - bulkOperation = mock(BulkOperation.class); + BulkOperationWrapper createObjectUnderTest(final EventHandle eventHandle, BulkOperation aBulkOperation) { + bulkOperation = Objects.isNull(aBulkOperation) ? mock(BulkOperation.class) : aBulkOperation; if (eventHandle == null) { return new BulkOperationWrapper(bulkOperation); } @@ -25,7 +41,7 @@ BulkOperationWrapper createObjectUnderTest(EventHandle eventHandle) { @Test public void testConstructorWithOneArgument() { - BulkOperationWrapper bulkOperationWithHandle = createObjectUnderTest(null); + BulkOperationWrapper bulkOperationWithHandle = createObjectUnderTest(null, null); assertThat(bulkOperationWithHandle.getBulkOperation(), equalTo(bulkOperation)); assertThat(bulkOperationWithHandle.getEventHandle(), equalTo(null)); } @@ -33,8 +49,84 @@ public void testConstructorWithOneArgument() { @Test public void testConstructorWithTwoArguments() { EventHandle eventHandle = mock(EventHandle.class); - BulkOperationWrapper bulkOperationWithHandle = createObjectUnderTest(eventHandle); + BulkOperationWrapper bulkOperationWithHandle = createObjectUnderTest(eventHandle, null); assertThat(bulkOperationWithHandle.getBulkOperation(), equalTo(bulkOperation)); assertThat(bulkOperationWithHandle.getEventHandle(), equalTo(eventHandle)); } + + @ParameterizedTest + @MethodSource("bulkOperationProvider") + public void testGetId(final BulkOperation bulkOperation) { + final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, bulkOperation); + assertThat(bulkOperationWrapper.getId(), equalTo(ID)); + } + + @Test + public void testGetIdUnsupportedAction() { + final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, getDeleteBulkOperation()); + assertThrows(UnsupportedOperationException.class, bulkOperationWrapper::getId); + } + + @ParameterizedTest + @MethodSource("bulkOperationProvider") + public void testGetIndex(final BulkOperation bulkOperation) { + final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, bulkOperation); + assertThat(bulkOperationWrapper.getIndex(), equalTo(INDEX)); + } + + @Test + public void testGetIndexUnsupportedAction() { + final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, getDeleteBulkOperation()); + assertThrows(UnsupportedOperationException.class, bulkOperationWrapper::getIndex); + } + + @ParameterizedTest + @MethodSource("bulkOperationProvider") + public void testGetDocument(final BulkOperation bulkOperation) { + final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, bulkOperation); + assertThat(bulkOperationWrapper.getDocument(), equalTo(DOCUMENT)); + } + + @Test + public void testGetDocumentUnsupportedAction() { + final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, getDeleteBulkOperation()); + assertThrows(UnsupportedOperationException.class, bulkOperationWrapper::getDocument); + } + + private static Stream bulkOperationProvider() { + final IndexOperation indexOperation = new IndexOperation.Builder<>() + .id(ID) + .index(INDEX) + .document(DOCUMENT) + .build(); + final BulkOperation indexBulkOperation = (BulkOperation) new BulkOperation.Builder() + .index(indexOperation) + .build(); + + final CreateOperation createOperation = new CreateOperation.Builder<>() + .id(ID) + .index(INDEX) + .document(DOCUMENT) + .build(); + final BulkOperation createBulkOperation = (BulkOperation) new BulkOperation.Builder() + .create(createOperation) + .build(); + + return Stream.of( + Arguments.of( + indexBulkOperation, + createBulkOperation + ) + ); + } + + private BulkOperation getDeleteBulkOperation() { + final DeleteOperation deleteOperation = new DeleteOperation.Builder() + .id(ID) + .index(INDEX) + .build(); + return new BulkOperation.Builder() + .delete(deleteOperation) + .build(); + } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverterTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverterTest.java index 57913b6c8f..aedd2d304a 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverterTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverterTest.java @@ -6,6 +6,7 @@ import org.opensearch.client.opensearch._types.ErrorCause; import org.opensearch.client.opensearch.core.bulk.BulkOperation; import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; +import org.opensearch.client.opensearch.core.bulk.CreateOperation; import org.opensearch.client.opensearch.core.bulk.IndexOperation; import org.opensearch.dataprepper.model.failures.DlqObject; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; @@ -52,11 +53,22 @@ public void setup() { failureMessage = UUID.randomUUID().toString(); failure = new Exception(failureMessage); bulkOperation = mock(BulkOperation.class); + final IndexOperation indexOperation = mock(IndexOperation.class); - when(bulkOperation.index()).thenReturn(indexOperation); when(indexOperation.index()).thenReturn(testIndex); when(indexOperation.document()).thenReturn(document); when(indexOperation.id()).thenReturn(testId); + + final CreateOperation createOperation = mock(CreateOperation.class); + when(createOperation.index()).thenReturn(testIndex); + when(createOperation.document()).thenReturn(document); + when(createOperation.id()).thenReturn(testId); + + when(bulkOperation.isIndex()).thenReturn(true); + when(bulkOperation.index()).thenReturn(indexOperation); + when(bulkOperation.isCreate()).thenReturn(false); + when(bulkOperation.create()).thenReturn(createOperation); + final ErrorCause errorCause = mock(ErrorCause.class); bulkResponseItem = mock(BulkResponseItem.class); when(bulkResponseItem.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR.getStatus()); @@ -80,6 +92,22 @@ public void testConvertToDlqObject() { validateResponse(result, errorReason); } + @Test + public void testConvertActionCreateToDlqObject() { + when(bulkOperation.isIndex()).thenReturn(false); + when(bulkOperation.isCreate()).thenReturn(true); + + final FailedBulkOperation testData = FailedBulkOperation.builder() + .withBulkOperation(new BulkOperationWrapper(bulkOperation)) + .withBulkResponseItem(bulkResponseItem) + .withFailure(failure) + .build(); + + final DlqObject result = converter.convertToDlqObject(testData); + + validateResponse(result, errorReason); + } + @Test public void testConvertToDlqObjectWithOnlyFailure() { final FailedBulkOperation testData = FailedBulkOperation.builder()