diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index fd8bf7d82f..c988b3ea00 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -48,6 +48,8 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType; import org.apache.commons.lang3.RandomStringUtils; + +import static org.hamcrest.CoreMatchers.is; import static org.mockito.Mockito.when; import javax.ws.rs.HttpMethod; @@ -215,8 +217,10 @@ public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws I RuntimeException.class, () -> sink.doInitialize()); } - @Test - public void testOutputRawSpanDefault() throws IOException, InterruptedException { + @ParameterizedTest + @CsvSource({"true,true", "true,false", "false,true", "false,false"}) + public void testOutputRawSpanDefault(final boolean estimateBulkSizeUsingCompression, + final boolean isRequestCompressionEnabled) throws IOException, InterruptedException { final String testDoc1 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1); final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_2); final ObjectMapper mapper = new ObjectMapper(); @@ -224,7 +228,8 @@ public void testOutputRawSpanDefault() throws IOException, InterruptedException @SuppressWarnings("unchecked") final Map expData2 = mapper.readValue(testDoc2, Map.class); final List> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2)); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); + final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, + estimateBulkSizeUsingCompression, isRequestCompressionEnabled); final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); @@ -275,12 +280,15 @@ public void testOutputRawSpanDefault() throws IOException, InterruptedException .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(773.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(773.0, 0)); + final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 773.0 : 2058.0; + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); } - @Test - public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException { + @ParameterizedTest + @CsvSource({"true,true", "true,false", "false,true", "false,false"}) + public void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompression, + final boolean isRequestCompressionEnabled) throws IOException, InterruptedException { // TODO: write test case final String testDoc1 = readDocFromFile("raw-span-error.json"); final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1); @@ -288,7 +296,8 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException @SuppressWarnings("unchecked") final Map expData = mapper.readValue(testDoc2, Map.class); final List> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2)); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); + final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null, + estimateBulkSizeUsingCompression, isRequestCompressionEnabled); // generate temporary directory for dlq file final File tempDirectory = Files.createTempDirectory("").toFile(); // add dlq file path into setting @@ -331,8 +340,9 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(1066.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(1066.0, 0)); + final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 1066.0 : 2072.0; + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); } @@ -355,14 +365,17 @@ public void testInstantiateSinkServiceMapDefault() throws IOException { } } - @Test - public void testOutputServiceMapDefault() throws IOException, InterruptedException { + @ParameterizedTest + @CsvSource({"true,true", "true,false", "false,true", "false,false"}) + public void testOutputServiceMapDefault(final boolean estimateBulkSizeUsingCompression, + final boolean isRequestCompressionEnabled) throws IOException, InterruptedException { final String testDoc = readDocFromFile(DEFAULT_SERVICE_MAP_FILE); final ObjectMapper mapper = new ObjectMapper(); @SuppressWarnings("unchecked") final Map expData = mapper.readValue(testDoc, Map.class); final List> testRecords = Collections.singletonList(jsonStringToRecord(testDoc)); - final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null); + final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null, + estimateBulkSizeUsingCompression, isRequestCompressionEnabled); OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); sink.output(testRecords); final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP); @@ -388,8 +401,9 @@ public void testOutputServiceMapDefault() throws IOException, InterruptedExcepti .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3)); MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(366.0, 0)); - MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(366.0, 0)); + final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 366.0 : 265.0; + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); + MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0)); // Check restart for index already exists sink = createObjectUnderTest(pluginSetting, true); @@ -884,6 +898,15 @@ private PluginSetting generatePluginSetting(final String indexType, final String return generatePluginSettingByMetadata(metadata); } + private PluginSetting generatePluginSetting(final String indexType, final String indexAlias, + final String templateFilePath, final boolean estimateBulkSizeUsingCompression, + final boolean requestCompressionEnabled) { + final Map metadata = initializeConfigurationMetadata(indexType, indexAlias, templateFilePath); + metadata.put(IndexConfiguration.ESTIMATE_BULK_SIZE_USING_COMPRESSION, estimateBulkSizeUsingCompression); + metadata.put(ConnectionConfiguration.REQUEST_COMPRESSION_ENABLED, requestCompressionEnabled); + return generatePluginSettingByMetadata(metadata); + } + private PluginSetting generatePluginSetting(final String indexType, final String indexAlias, final String templateType, final String templateFilePath) { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 72a277ce4f..09be31c1ae 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -193,8 +193,14 @@ private void doInitializeInternal() throws IOException { } indexManager.setupIndex(); - if (openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled()) { + final boolean isEstimateBulkSizeUsingCompression = openSearchSinkConfig.getIndexConfiguration().isEstimateBulkSizeUsingCompression(); + final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled(); + if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) { bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(new BulkRequest.Builder(), bulkSize); + } else if (isEstimateBulkSizeUsingCompression) { + LOG.warn("Estimate bulk request size using compression was enabled but request compression is disabled. " + + "Estimating bulk request size without compression."); + bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); } else { bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder()); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingCompressedBulkRequest.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingCompressedBulkRequest.java index adcc33d926..e61168a914 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingCompressedBulkRequest.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingCompressedBulkRequest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; +import com.google.common.annotations.VisibleForTesting; import org.opensearch.dataprepper.plugins.sink.opensearch.BulkOperationWrapper; import org.opensearch.client.opensearch.core.BulkRequest; import org.slf4j.Logger; @@ -22,6 +23,8 @@ public class JavaClientAccumulatingCompressedBulkRequest implements AccumulatingBulkRequest { private static final Logger LOG = LoggerFactory.getLogger(JavaClientAccumulatingCompressedBulkRequest.class); + private static final int MAX_SAMPLE_TIMES = 2; + private final List bulkOperations; private long sampleSize; private final long targetBulkSize; @@ -29,6 +32,7 @@ public class JavaClientAccumulatingCompressedBulkRequest implements Accumulating private long currentBulkSize = 0L; private double sampledOperationSize = 0.0; private int operationCount = 0; + private int timesSampled = 0; private BulkRequest builtRequest; public JavaClientAccumulatingCompressedBulkRequest(final BulkRequest.Builder bulkRequestBuilder, final long targetBulkSize) { @@ -39,6 +43,14 @@ public JavaClientAccumulatingCompressedBulkRequest(final BulkRequest.Builder bul this.sampleSize = 10; } + @VisibleForTesting + JavaClientAccumulatingCompressedBulkRequest(final BulkRequest.Builder bulkRequestBuilder, final long targetBulkSize, final int sampleSize) { + this.bulkRequestBuilder = bulkRequestBuilder; + bulkOperations = new ArrayList<>(); + this.targetBulkSize = targetBulkSize; + this.sampleSize = sampleSize; + } + @Override public long estimateSizeInBytesWithDocument(BulkOperationWrapper documentOrOperation) { return currentBulkSize + (long) sampledOperationSize; @@ -51,10 +63,11 @@ public void addOperation(BulkOperationWrapper bulkOperation) { operationCount++; bulkOperations.add(bulkOperation); - if (bulkOperations.size() == sampleSize) { + if (timesSampled < MAX_SAMPLE_TIMES && bulkOperations.size() == sampleSize) { currentBulkSize = estimateBulkSize(); sampledOperationSize = (double) currentBulkSize / (double) bulkOperations.size(); updateTargetSampleSize(); + timesSampled++; } else { currentBulkSize += sampledOperationSize; } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequest.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequest.java index cfcab9fa19..af340a5852 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequest.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequest.java @@ -2,7 +2,6 @@ import org.opensearch.client.opensearch.core.BulkRequest; import org.opensearch.dataprepper.plugins.sink.opensearch.BulkOperationWrapper; -import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index 8b1728a67f..02e9f6ebb6 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -36,13 +36,13 @@ public class IndexConfiguration { public static final String NUM_SHARDS = "number_of_shards"; public static final String NUM_REPLICAS = "number_of_replicas"; public static final String BULK_SIZE = "bulk_size"; - public static final String BULK_SAMPLE_SIZE = "bulk_sample_size"; + public static final String ESTIMATE_BULK_SIZE_USING_COMPRESSION = "estimate_bulk_size_using_compression"; public static final String FLUSH_TIMEOUT = "flush_timeout"; public static final String DOCUMENT_ID_FIELD = "document_id_field"; public static final String ROUTING_FIELD = "routing_field"; public static final String ISM_POLICY_FILE = "ism_policy_file"; public static final long DEFAULT_BULK_SIZE = 5L; - public static final int DEFAULT_BULK_SAMPLE_SIZE = 5000; + public static final boolean DEFAULT_ESTIMATE_BULK_SIZE_USING_COMPRESSION = false; public static final long DEFAULT_FLUSH_TIMEOUT = 60_000L; public static final String ACTION = "action"; public static final String S3_AWS_REGION = "s3_aws_region"; @@ -59,7 +59,7 @@ public class IndexConfiguration { private final String documentIdField; private final String routingField; private final long bulkSize; - private final int bulkSampleSize; + private final boolean estimateBulkSizeUsingCompression; private final long flushTimeout; private final Optional ismPolicyFile; private final String action; @@ -106,7 +106,7 @@ private IndexConfiguration(final Builder builder) { } this.indexAlias = indexAlias; this.bulkSize = builder.bulkSize; - this.bulkSampleSize = builder.bulkSampleSize; + this.estimateBulkSizeUsingCompression = builder.estimateBulkSizeUsingCompression; this.flushTimeout = builder.flushTimeout; this.routingField = builder.routingField; @@ -157,8 +157,9 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti builder = builder.withNumReplicas(pluginSetting.getIntegerOrDefault(NUM_REPLICAS, 0)); final Long batchSize = pluginSetting.getLongOrDefault(BULK_SIZE, DEFAULT_BULK_SIZE); builder = builder.withBulkSize(batchSize); - final int bulkSampleSize = pluginSetting.getIntegerOrDefault(BULK_SAMPLE_SIZE, DEFAULT_BULK_SAMPLE_SIZE); - builder = builder.withBulkSampleSize(bulkSampleSize); + final boolean estimateBulkSizeUsingCompression = + pluginSetting.getBooleanOrDefault(ESTIMATE_BULK_SIZE_USING_COMPRESSION, DEFAULT_ESTIMATE_BULK_SIZE_USING_COMPRESSION); + builder = builder.withEstimateBulkSizeUsingCompression(estimateBulkSizeUsingCompression); final long flushTimeout = pluginSetting.getLongOrDefault(FLUSH_TIMEOUT, DEFAULT_FLUSH_TIMEOUT); builder = builder.withFlushTimeout(flushTimeout); final String documentId = pluginSetting.getStringOrDefault(DOCUMENT_ID_FIELD, null); @@ -227,11 +228,10 @@ public long getBulkSize() { return bulkSize; } - public int getBulkSampleSize() { - return bulkSampleSize; + public boolean isEstimateBulkSizeUsingCompression() { + return estimateBulkSizeUsingCompression; } - public long getFlushTimeout() { return flushTimeout; } @@ -320,7 +320,7 @@ public static class Builder { private String routingField; private String documentIdField; private long bulkSize = DEFAULT_BULK_SIZE; - private int bulkSampleSize = DEFAULT_BULK_SAMPLE_SIZE; + private boolean estimateBulkSizeUsingCompression = DEFAULT_ESTIMATE_BULK_SIZE_USING_COMPRESSION; private long flushTimeout = DEFAULT_FLUSH_TIMEOUT; private Optional ismPolicyFile; private String action; @@ -374,8 +374,8 @@ public Builder withBulkSize(final long bulkSize) { return this; } - public Builder withBulkSampleSize(final int bulkSampleSize) { - this.bulkSampleSize = bulkSampleSize; + public Builder withEstimateBulkSizeUsingCompression(final boolean estimateBulkSizeUsingCompression) { + this.estimateBulkSizeUsingCompression = estimateBulkSizeUsingCompression; return this; } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingCompressedBulkRequestTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingCompressedBulkRequestTest.java index 013913dd57..ea3e874e82 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingCompressedBulkRequestTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingCompressedBulkRequestTest.java @@ -25,7 +25,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.sameInstance; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.closeTo; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -48,7 +47,7 @@ void setUp() { } private JavaClientAccumulatingCompressedBulkRequest createObjectUnderTest() { - return new JavaClientAccumulatingCompressedBulkRequest(bulkRequestBuilder, 1); + return new JavaClientAccumulatingCompressedBulkRequest(bulkRequestBuilder, 5 * 1024 * 1024, 1); } @Test @@ -111,7 +110,8 @@ void getEstimatedSizeInBytes_returns_the_operation_overhead_if_requests_have_no_ objectUnderTest.addOperation(new BulkOperationWrapper(createBulkOperation(emptyDocument))); } - assertThat((double) objectUnderTest.getEstimatedSizeInBytes(), closeTo(expectedDocumentSize, operationCount)); + final long expectedSize = expectedDocumentSize * operationCount; + assertThat(objectUnderTest.getEstimatedSizeInBytes(), equalTo(expectedSize)); } @Test diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequestTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequestTest.java new file mode 100644 index 0000000000..a1ea5159f0 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/JavaClientAccumulatingUncompressedBulkRequestTest.java @@ -0,0 +1,222 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.opensearch.bulk; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.client.opensearch.core.BulkRequest; +import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.client.opensearch.core.bulk.IndexOperation; +import org.opensearch.dataprepper.plugins.sink.opensearch.BulkOperationWrapper; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class JavaClientAccumulatingUncompressedBulkRequestTest { + + private BulkRequest.Builder bulkRequestBuilder; + + @BeforeEach + void setUp() { + bulkRequestBuilder = mock(BulkRequest.Builder.class); + + when(bulkRequestBuilder.operations(any(BulkOperation.class))) + .thenReturn(bulkRequestBuilder); + + } + + private JavaClientAccumulatingUncompressedBulkRequest createObjectUnderTest() { + return new JavaClientAccumulatingUncompressedBulkRequest(bulkRequestBuilder); + } + + @Test + void getOperationCount_returns_0_if_no_interactions() { + assertThat(createObjectUnderTest().getOperationsCount(), equalTo(0)); + } + + @Test + void getOperations_returns_empty_list_if_no_interactions() { + assertThat(createObjectUnderTest().getOperations(), + equalTo(Collections.emptyList())); + } + + @Test + void getOperations_returns_unmodifiable_list() { + final List operations = createObjectUnderTest().getOperations(); + + final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(generateDocument())); + assertThrows(UnsupportedOperationException.class, () -> operations.add(bulkOperation)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 3, 10}) + void getOperationsCount_returns_the_correct_operation_count(final int operationCount) { + final JavaClientAccumulatingUncompressedBulkRequest objectUnderTest = createObjectUnderTest(); + for (int i = 0; i < operationCount; i++) { + final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(generateDocument())); + objectUnderTest.addOperation(bulkOperation); + } + + assertThat(objectUnderTest.getOperationsCount(), equalTo(operationCount)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 3, 10}) + void getEstimatedSizeInBytes_returns_the_current_size(final int operationCount) { + final JavaClientAccumulatingUncompressedBulkRequest objectUnderTest = createObjectUnderTest(); + final long arbitraryDocumentSize = 175; + for (int i = 0; i < operationCount; i++) { + final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(generateDocumentWithLength(arbitraryDocumentSize))); + objectUnderTest.addOperation(bulkOperation); + } + + final long expectedSize = operationCount * (arbitraryDocumentSize + JavaClientAccumulatingUncompressedBulkRequest.OPERATION_OVERHEAD); + assertThat(objectUnderTest.getEstimatedSizeInBytes(), equalTo(expectedSize)); + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 3, 10}) + void getEstimatedSizeInBytes_returns_the_operation_overhead_if_requests_have_no_documents(final int operationCount) { + final JavaClientAccumulatingUncompressedBulkRequest objectUnderTest = createObjectUnderTest(); + for (int i = 0; i < operationCount; i++) { + objectUnderTest.addOperation(new BulkOperationWrapper(createBulkOperation(null))); + } + + final long expectedSize = operationCount * JavaClientAccumulatingUncompressedBulkRequest.OPERATION_OVERHEAD; + assertThat(objectUnderTest.getEstimatedSizeInBytes(), equalTo(expectedSize)); + } + + @Test + void getOperationAt_returns_the_correct_index() { + final JavaClientAccumulatingUncompressedBulkRequest objectUnderTest = createObjectUnderTest(); + + List knownOperations = new ArrayList<>(); + + for (int i = 0; i < 7; i++) { + BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(generateDocument())); + objectUnderTest.addOperation(bulkOperation); + knownOperations.add(bulkOperation); + } + + for (int i = 0; i < 7; i++) { + assertThat(objectUnderTest.getOperationAt(i), equalTo(knownOperations.get(i))); + } + } + + @ParameterizedTest + @ValueSource(longs = {0, 1, 2, 10, 50, 100}) + void estimateSizeInBytesWithDocument_on_new_object_returns_estimated_document_size_plus_operation_overhead(long inputDocumentSize) { + final SizedDocument document = generateDocumentWithLength(inputDocumentSize); + final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(document)); + + assertThat(createObjectUnderTest().estimateSizeInBytesWithDocument(bulkOperation), + equalTo(inputDocumentSize + JavaClientAccumulatingUncompressedBulkRequest.OPERATION_OVERHEAD)); + } + + @ParameterizedTest + @ValueSource(longs = {0, 1, 2, 10, 50, 100}) + void estimateSizeInBytesWithDocument_on_request_with_operations_returns_estimated_document_size_plus_operation_overhead(long inputDocumentSize) { + final SizedDocument document = generateDocumentWithLength(inputDocumentSize); + final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(document)); + + final JavaClientAccumulatingUncompressedBulkRequest objectUnderTest = createObjectUnderTest(); + objectUnderTest.addOperation(new BulkOperationWrapper(createBulkOperation(generateDocumentWithLength(inputDocumentSize)))); + + final long expectedSize = 2 * (inputDocumentSize + JavaClientAccumulatingUncompressedBulkRequest.OPERATION_OVERHEAD); + assertThat(objectUnderTest.estimateSizeInBytesWithDocument(bulkOperation), + equalTo(expectedSize)); + } + + @Test + void estimateSizeInBytesWithDocument_on_new_object_returns_operation_overhead_if_no_document() { + final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(null)); + + assertThat(createObjectUnderTest().estimateSizeInBytesWithDocument(bulkOperation), + equalTo((long) JavaClientAccumulatingUncompressedBulkRequest.OPERATION_OVERHEAD)); + } + + @Test + void addOperation_adds_operation_to_the_BulkRequestBuilder() { + final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(generateDocument())); + + createObjectUnderTest().addOperation(bulkOperation); + + verify(bulkRequestBuilder).operations(bulkOperation.getBulkOperation()); + } + + @Test + void addOperation_throws_when_BulkOperation_is_not_an_index_request() { + final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(mock(BulkOperation.class)); + + final JavaClientAccumulatingUncompressedBulkRequest objectUnderTest = createObjectUnderTest(); + + assertThrows(UnsupportedOperationException.class, () -> objectUnderTest.addOperation(bulkOperation)); + } + + @Test + void addOperation_throws_when_document_is_not_JsonSize() { + final BulkOperationWrapper bulkOperation = new BulkOperationWrapper(createBulkOperation(UUID.randomUUID().toString())); + + final JavaClientAccumulatingUncompressedBulkRequest objectUnderTest = createObjectUnderTest(); + + assertThrows(IllegalArgumentException.class, () -> objectUnderTest.addOperation(bulkOperation)); + } + + @Test + void getRequest_returns_BulkRequestBuilder_build() { + BulkRequest expectedBulkRequest = mock(BulkRequest.class); + when(bulkRequestBuilder.build()).thenReturn(expectedBulkRequest); + + assertThat(createObjectUnderTest().getRequest(), equalTo(expectedBulkRequest)); + } + + @Test + void getRequest_called_multiple_times_only_builds_once_and_reuses_the_built_request() { + BulkRequest expectedBulkRequest = mock(BulkRequest.class); + when(bulkRequestBuilder.build()).thenReturn(expectedBulkRequest); + + final JavaClientAccumulatingUncompressedBulkRequest objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.getRequest(), equalTo(expectedBulkRequest)); + assertThat(objectUnderTest.getRequest(), sameInstance(objectUnderTest.getRequest())); + + verify(bulkRequestBuilder, times(1)).build(); + } + + private BulkOperation createBulkOperation(Object document) { + final IndexOperation indexOperation = mock(IndexOperation.class); + when(indexOperation.document()).thenReturn(document); + final BulkOperation bulkOperation = mock(BulkOperation.class); + when(bulkOperation.isIndex()).thenReturn(true); + when(bulkOperation.index()).thenReturn(indexOperation); + + return bulkOperation; + } + + private SizedDocument generateDocument() { + return generateDocumentWithLength(10L); + } + + private SizedDocument generateDocumentWithLength(long documentLength) { + final SizedDocument sizedDocument = mock(SizedDocument.class); + when(sizedDocument.getDocumentSize()).thenReturn(documentLength); + return sizedDocument; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java index e2bb31356b..49a3465568 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfigurationTests.java @@ -282,6 +282,7 @@ public void testReadIndexConfig_RawIndexType() { assertFalse(indexConfiguration.getIndexTemplate().isEmpty()); assertEquals(5, indexConfiguration.getBulkSize()); assertEquals(60_000L, indexConfiguration.getFlushTimeout()); + assertEquals(false, indexConfiguration.isEstimateBulkSizeUsingCompression()); assertEquals("spanId", indexConfiguration.getDocumentIdField()); } @@ -306,6 +307,7 @@ public void testReadIndexConfig_ServiceMapIndexType() { assertFalse(indexConfiguration.getIndexTemplate().isEmpty()); assertEquals(5, indexConfiguration.getBulkSize()); assertEquals(60_000L, indexConfiguration.getFlushTimeout()); + assertEquals(false, indexConfiguration.isEstimateBulkSizeUsingCompression()); assertEquals("hashId", indexConfiguration.getDocumentIdField()); } @@ -319,12 +321,14 @@ public void testReadIndexConfigCustom() { final String testIdField = "someId"; final PluginSetting pluginSetting = generatePluginSetting( null, testIndexAlias, defaultTemplateFilePath, testBulkSize, testFlushTimeout, testIdField); + pluginSetting.getSettings().put(IndexConfiguration.ESTIMATE_BULK_SIZE_USING_COMPRESSION, true); final IndexConfiguration indexConfiguration = IndexConfiguration.readIndexConfig(pluginSetting); assertEquals(IndexType.CUSTOM, indexConfiguration.getIndexType()); assertEquals(testIndexAlias, indexConfiguration.getIndexAlias()); assertFalse(indexConfiguration.getIndexTemplate().isEmpty()); assertEquals(testBulkSize, indexConfiguration.getBulkSize()); assertEquals(testFlushTimeout, indexConfiguration.getFlushTimeout()); + assertEquals(true, indexConfiguration.isEstimateBulkSizeUsingCompression()); assertEquals(testIdField, indexConfiguration.getDocumentIdField()); }