Skip to content

Commit

Permalink
Consolidate logic related to extracting data from a BulkOperation
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Jul 18, 2023
1 parent 8785bbf commit 831017a
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,34 @@

import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkNotNull;

public class BulkOperationWrapper {
private static final Predicate<BulkOperation> IS_INDEX_OPERATION = BulkOperation::isIndex;
private static final Predicate<BulkOperation> IS_CREATE_OPERATION = BulkOperation::isCreate;

private static final Map<Predicate<BulkOperation>, Function<BulkOperation, Object>> BULK_OPERATION_TO_DOCUMENT_CONVERTERS = Map.of(
IS_INDEX_OPERATION, operation -> operation.index().document(),
IS_CREATE_OPERATION, operation -> operation.create().document()
);

private static final Map<Predicate<BulkOperation>, Function<BulkOperation, String>> BULK_OPERATION_TO_INDEX_NAME_CONVERTERS = Map.of(
IS_INDEX_OPERATION, operation -> operation.index().index(),
IS_CREATE_OPERATION, operation -> operation.create().index()
);

private static final Map<Predicate<BulkOperation>, Function<BulkOperation, String>> BULK_OPERATION_TO_ID_CONVERTERS = Map.of(
IS_INDEX_OPERATION, operation -> operation.index().id(),
IS_CREATE_OPERATION, operation -> operation.create().id()
);

private final EventHandle eventHandle;
private final BulkOperation bulkOperation;

Expand Down Expand Up @@ -37,4 +62,29 @@ public void releaseEventHandle(boolean result) {
eventHandle.release(result);
}
}

public Object getDocument() {
return getValueFromConverter(BULK_OPERATION_TO_DOCUMENT_CONVERTERS);
}

public String getIndex() {
return getValueFromConverter(BULK_OPERATION_TO_INDEX_NAME_CONVERTERS);
}

public String getId() {
return getValueFromConverter(BULK_OPERATION_TO_ID_CONVERTERS);
}

private <T> T getValueFromConverter(final Map<Predicate<BulkOperation>, Function<BulkOperation, T>> converters) {
final List<T> values = converters.entrySet().stream()
.filter(entry -> entry.getKey().test(bulkOperation))
.map(entry -> entry.getValue().apply(bulkOperation))
.collect(Collectors.toList());

if (values.size() != 1) {
throw new UnsupportedOperationException("Only index or create operations are supported currently." + bulkOperation);
}

return values.get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.opensearch.dataprepper.plugins.sink.opensearch.BulkOperationWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedDlqData;

/**
Expand All @@ -47,10 +47,10 @@ public class BulkOperationWriter {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

public static String bulkOperationToString(BulkOperation bulkOperation) {
String index = bulkOperation.index().index();
public static String bulkOperationToString(BulkOperationWrapper bulkOperation) {
String index = bulkOperation.getIndex();
String source = extractDocumentSource(bulkOperation);
String id = bulkOperation.index().id();
String id = bulkOperation.getId();

String sSource = "_na_";
try {
Expand Down Expand Up @@ -86,8 +86,8 @@ private static String extractDocumentSource(final FailedDlqData failedData) {
}
}

private static String extractDocumentSource(BulkOperation bulkOperation) {
final SerializedJson document = (SerializedJson) bulkOperation.index().document();
private static String extractDocumentSource(BulkOperationWrapper bulkOperation) {
final SerializedJson document = (SerializedJson) bulkOperation.getDocument();

return new String(document.getSerializedJson());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,7 @@ private long estimateBulkSize() {
}

private Object mapBulkOperationToDocument(final BulkOperationWrapper bulkOperation) {
Object anyDocument;

if (bulkOperation.getBulkOperation().isIndex()) {
anyDocument = bulkOperation.getBulkOperation().index().document();
} else if (bulkOperation.getBulkOperation().isCreate()) {
anyDocument = bulkOperation.getBulkOperation().create().document();
} else {
throw new UnsupportedOperationException("Only index or create operations are supported currently. " + bulkOperation);
}
final Object anyDocument = bulkOperation.getDocument();

if (anyDocument == null) {
return new SerializedJsonImpl(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,7 @@ public BulkRequest getRequest() {
}

private long estimateBulkOperationSize(BulkOperationWrapper bulkOperation) {

Object anyDocument;

if (bulkOperation.getBulkOperation().isIndex()) {
anyDocument = bulkOperation.getBulkOperation().index().document();
} else if (bulkOperation.getBulkOperation().isCreate()) {
anyDocument = bulkOperation.getBulkOperation().create().document();
} else {
throw new UnsupportedOperationException("Only index or create operations are supported currently. " + bulkOperation);
}
final Object anyDocument = bulkOperation.getDocument();

if (anyDocument == null)
return OPERATION_OVERHEAD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson;
Expand Down Expand Up @@ -34,14 +33,13 @@ public FailedBulkOperationConverter(final String pipelineName, final String plug
public DlqObject convertToDlqObject(final FailedBulkOperation failedBulkOperation) {

final BulkOperationWrapper bulkOperationWithHandle = failedBulkOperation.getBulkOperation();
final BulkOperation bulkOperation = bulkOperationWithHandle.getBulkOperation();
final BulkResponseItem bulkResponseItem = failedBulkOperation.getBulkResponseItem();

final Object document = convertDocumentToGenericMap(bulkOperation);
final Object document = convertDocumentToGenericMap(bulkOperationWithHandle);

final FailedDlqData.Builder failedDlqDataBuilder = FailedDlqData.builder()
.withIndex(bulkOperation.index().index())
.withIndexId(bulkOperation.index().id())
.withIndex(bulkOperationWithHandle.getIndex())
.withIndexId(bulkOperationWithHandle.getId())
.withDocument(document);

if (bulkResponseItem != null) {
Expand All @@ -61,8 +59,8 @@ public DlqObject convertToDlqObject(final FailedBulkOperation failedBulkOperatio
.build();
}

private Object convertDocumentToGenericMap(final BulkOperation bulkOperation) {
final SerializedJson document = (SerializedJson) bulkOperation.index().document();
private Object convertDocumentToGenericMap(final BulkOperationWrapper bulkOperation) {
final SerializedJson document = (SerializedJson) bulkOperation.getDocument();
final byte[] documentBytes = document.getSerializedJson();
final String jsonString = new String(documentBytes, StandardCharsets.UTF_8);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,34 @@

package org.opensearch.dataprepper.plugins.sink.opensearch;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.client.opensearch.core.bulk.CreateOperation;
import org.opensearch.client.opensearch.core.bulk.DeleteOperation;
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.junit.jupiter.api.Test;

import java.util.Objects;
import java.util.UUID;
import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;

public class BulkOperationWrapperTests {
private static final String ID = UUID.randomUUID().toString();
private static final String INDEX = UUID.randomUUID().toString();
private static final String DOCUMENT = UUID.randomUUID().toString();

private BulkOperation bulkOperation;

BulkOperationWrapper createObjectUnderTest(EventHandle eventHandle) {
bulkOperation = mock(BulkOperation.class);
BulkOperationWrapper createObjectUnderTest(final EventHandle eventHandle, BulkOperation aBulkOperation) {
bulkOperation = Objects.isNull(aBulkOperation) ? mock(BulkOperation.class) : aBulkOperation;
if (eventHandle == null) {
return new BulkOperationWrapper(bulkOperation);
}
Expand All @@ -25,16 +41,92 @@ BulkOperationWrapper createObjectUnderTest(EventHandle eventHandle) {

@Test
public void testConstructorWithOneArgument() {
BulkOperationWrapper bulkOperationWithHandle = createObjectUnderTest(null);
BulkOperationWrapper bulkOperationWithHandle = createObjectUnderTest(null, null);
assertThat(bulkOperationWithHandle.getBulkOperation(), equalTo(bulkOperation));
assertThat(bulkOperationWithHandle.getEventHandle(), equalTo(null));
}

@Test
public void testConstructorWithTwoArguments() {
EventHandle eventHandle = mock(EventHandle.class);
BulkOperationWrapper bulkOperationWithHandle = createObjectUnderTest(eventHandle);
BulkOperationWrapper bulkOperationWithHandle = createObjectUnderTest(eventHandle, null);
assertThat(bulkOperationWithHandle.getBulkOperation(), equalTo(bulkOperation));
assertThat(bulkOperationWithHandle.getEventHandle(), equalTo(eventHandle));
}

@ParameterizedTest
@MethodSource("bulkOperationProvider")
public void testGetId(final BulkOperation bulkOperation) {
final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, bulkOperation);
assertThat(bulkOperationWrapper.getId(), equalTo(ID));
}

@Test
public void testGetIdUnsupportedAction() {
final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, getDeleteBulkOperation());
assertThrows(UnsupportedOperationException.class, bulkOperationWrapper::getId);
}

@ParameterizedTest
@MethodSource("bulkOperationProvider")
public void testGetIndex(final BulkOperation bulkOperation) {
final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, bulkOperation);
assertThat(bulkOperationWrapper.getIndex(), equalTo(INDEX));
}

@Test
public void testGetIndexUnsupportedAction() {
final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, getDeleteBulkOperation());
assertThrows(UnsupportedOperationException.class, bulkOperationWrapper::getIndex);
}

@ParameterizedTest
@MethodSource("bulkOperationProvider")
public void testGetDocument(final BulkOperation bulkOperation) {
final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, bulkOperation);
assertThat(bulkOperationWrapper.getDocument(), equalTo(DOCUMENT));
}

@Test
public void testGetDocumentUnsupportedAction() {
final BulkOperationWrapper bulkOperationWrapper = createObjectUnderTest(null, getDeleteBulkOperation());
assertThrows(UnsupportedOperationException.class, bulkOperationWrapper::getDocument);
}

private static Stream<Arguments> bulkOperationProvider() {
final IndexOperation indexOperation = new IndexOperation.Builder<>()
.id(ID)
.index(INDEX)
.document(DOCUMENT)
.build();
final BulkOperation indexBulkOperation = (BulkOperation) new BulkOperation.Builder()
.index(indexOperation)
.build();

final CreateOperation createOperation = new CreateOperation.Builder<>()
.id(ID)
.index(INDEX)
.document(DOCUMENT)
.build();
final BulkOperation createBulkOperation = (BulkOperation) new BulkOperation.Builder()
.create(createOperation)
.build();

return Stream.of(
Arguments.of(
indexBulkOperation,
createBulkOperation
)
);
}

private BulkOperation getDeleteBulkOperation() {
final DeleteOperation deleteOperation = new DeleteOperation.Builder()
.id(ID)
.index(INDEX)
.build();
return new BulkOperation.Builder()
.delete(deleteOperation)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.opensearch.client.opensearch._types.ErrorCause;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
import org.opensearch.client.opensearch.core.bulk.CreateOperation;
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson;
Expand Down Expand Up @@ -52,11 +53,22 @@ public void setup() {
failureMessage = UUID.randomUUID().toString();
failure = new Exception(failureMessage);
bulkOperation = mock(BulkOperation.class);

final IndexOperation indexOperation = mock(IndexOperation.class);
when(bulkOperation.index()).thenReturn(indexOperation);
when(indexOperation.index()).thenReturn(testIndex);
when(indexOperation.document()).thenReturn(document);
when(indexOperation.id()).thenReturn(testId);

final CreateOperation createOperation = mock(CreateOperation.class);
when(createOperation.index()).thenReturn(testIndex);
when(createOperation.document()).thenReturn(document);
when(createOperation.id()).thenReturn(testId);

when(bulkOperation.isIndex()).thenReturn(true);
when(bulkOperation.index()).thenReturn(indexOperation);
when(bulkOperation.isCreate()).thenReturn(false);
when(bulkOperation.create()).thenReturn(createOperation);

final ErrorCause errorCause = mock(ErrorCause.class);
bulkResponseItem = mock(BulkResponseItem.class);
when(bulkResponseItem.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR.getStatus());
Expand All @@ -80,6 +92,22 @@ public void testConvertToDlqObject() {
validateResponse(result, errorReason);
}

@Test
public void testConvertActionCreateToDlqObject() {
when(bulkOperation.isIndex()).thenReturn(false);
when(bulkOperation.isCreate()).thenReturn(true);

final FailedBulkOperation testData = FailedBulkOperation.builder()
.withBulkOperation(new BulkOperationWrapper(bulkOperation))
.withBulkResponseItem(bulkResponseItem)
.withFailure(failure)
.build();

final DlqObject result = converter.convertToDlqObject(testData);

validateResponse(result, errorReason);
}

@Test
public void testConvertToDlqObjectWithOnlyFailure() {
final FailedBulkOperation testData = FailedBulkOperation.builder()
Expand Down

0 comments on commit 831017a

Please sign in to comment.