diff --git a/RFS/src/main/java/com/rfs/common/OpenSearchClient.java b/RFS/src/main/java/com/rfs/common/OpenSearchClient.java index cc8e5b5af..0d79a4c24 100644 --- a/RFS/src/main/java/com/rfs/common/OpenSearchClient.java +++ b/RFS/src/main/java/com/rfs/common/OpenSearchClient.java @@ -30,11 +30,15 @@ public class OpenSearchClient { private final RestClient client; public OpenSearchClient(ConnectionContext connectionContext) { - this.client = new RestClient(connectionContext); + this(new RestClient(connectionContext)); } public OpenSearchClient(ConnectionContext connectionContext, int maxConnections) { - this.client = new RestClient(connectionContext, maxConnections); + this(new RestClient(connectionContext, maxConnections)); + } + + OpenSearchClient(RestClient client) { + this.client = client; } /* diff --git a/RFS/src/test/java/com/rfs/common/OpenSearchClientTest.java b/RFS/src/test/java/com/rfs/common/OpenSearchClientTest.java new file mode 100644 index 000000000..5b2a81bc8 --- /dev/null +++ b/RFS/src/test/java/com/rfs/common/OpenSearchClientTest.java @@ -0,0 +1,113 @@ +package com.rfs.common; + +import java.util.Optional; + +import com.fasterxml.jackson.core.StreamReadFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.junit.jupiter.api.Test; + +import com.rfs.common.http.HttpResponse; +import com.rfs.tracing.IRfsContexts.ICheckedIdempotentPutRequestContext; +import lombok.SneakyThrows; +import reactor.core.publisher.Mono; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class OpenSearchClientTest { + private static final ObjectMapper OBJECT_MAPPER = JsonMapper.builder() + .enable(StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION) + .build(); + + @Test + void testCreateIndex() { + // Setup + var checkIfExistsResponse = new HttpResponse(404, "", null, "does not exist"); + var createdItemRawJson = "{\"created\":\"yup!\"}"; + var createItemResponse = new HttpResponse(200, "", null, createdItemRawJson); + + var restClient = mock(RestClient.class); + when(restClient.getAsync(any(), any())).thenReturn(Mono.just(checkIfExistsResponse)); + when(restClient.putAsync(any(), any(), any())).thenReturn(Mono.just(createItemResponse)); + + // Action + var rawJson = "{ }"; + var result = createIndex(restClient, rawJson); + + // Assertions + assertThat(result.get().toPrettyString(), containsString(rawJson)); + // The interface is to send back the passed json if on success + assertThat(result.get().toPrettyString(), not(containsString(createdItemRawJson))); + } + + @Test + void testCreateIndex_alreadyExists() { + var checkIfExistsResponse = new HttpResponse(200, "", null, "I exist!"); + + var restClient = mock(RestClient.class); + when(restClient.getAsync(any(), any())).thenReturn(Mono.just(checkIfExistsResponse)); + + var rawJson = "{ }"; + var result = createIndex(restClient, rawJson); + + assertThat(result, equalTo(Optional.empty())); + } + + @Test + void testCreateIndex_errorOnCreation_retried() { + // Setup + var checkIfExistsResponse = new HttpResponse(404, "", null, "does not exist"); + var createdItemRawJson = "{\"error\":\"unauthorized\"}"; + var createItemResponse = new HttpResponse(403, "", null, createdItemRawJson); + + var restClient = mock(RestClient.class); + when(restClient.getAsync(any(), any())).thenReturn(Mono.just(checkIfExistsResponse)); + when(restClient.putAsync(any(), any(), any())).thenReturn(Mono.just(createItemResponse)); + + // Action + var rawJson = "{ }"; + var exception = assertThrows(IllegalStateException.class, () -> createIndex(restClient, rawJson)); + + // Assertions + assertThat(exception.getClass().getName(), containsString("RetryExhaustedException")); + assertThat(exception.getMessage(), containsString("Retries exhausted")); + assertThat(exception.getCause().getMessage(), containsString("403")); + assertThat(exception.getCause().getMessage(), containsString("unauthorized")); + // The interface is to send back the passed json if on success + } + + @Test + void testCreateIndex_errorOnCreation_notRetriedOnBadRequest() { + // Setup + var checkIfExistsResponse = new HttpResponse(404, "", null, "does not exist"); + var createdItemRawJson = "{\"error\":\"illegal_argument_exception\"}"; + var createItemResponse = new HttpResponse(400, "", null, createdItemRawJson); + + var restClient = mock(RestClient.class); + when(restClient.getAsync(any(), any())).thenReturn(Mono.just(checkIfExistsResponse)); + when(restClient.putAsync(any(), any(), any())).thenReturn(Mono.just(createItemResponse)); + + // Action + var rawJson = "{ }"; + var exception = assertThrows(InvalidResponse.class, () -> createIndex(restClient, rawJson)); + + // Assertions + assertThat(exception.getMessage(), containsString("illegal_argument_exception")); + } + + @SneakyThrows + private Optional createIndex(RestClient restClient, String rawJson) { + var openSearchClient = new OpenSearchClient(restClient); + + var body = (ObjectNode) OBJECT_MAPPER.readTree(rawJson); + return openSearchClient.createIndex("indexName", body, mock(ICheckedIdempotentPutRequestContext.class)); + } +}