Skip to content

Commit

Permalink
Add setting to enable/disable estimation with compression
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Jul 5, 2023
1 parent 398f10b commit 4c186d3
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConstants;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexType;
import org.apache.commons.lang3.RandomStringUtils;

import static org.hamcrest.CoreMatchers.is;
import static org.mockito.Mockito.when;

import javax.ws.rs.HttpMethod;
Expand Down Expand Up @@ -215,16 +217,19 @@ public void testInstantiateSinkRawSpanReservedAliasAlreadyUsedAsIndex() throws I
RuntimeException.class, () -> sink.doInitialize());
}

@Test
public void testOutputRawSpanDefault() throws IOException, InterruptedException {
@ParameterizedTest
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
public void testOutputRawSpanDefault(final boolean estimateBulkSizeUsingCompression,
final boolean isRequestCompressionEnabled) throws IOException, InterruptedException {
final String testDoc1 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1);
final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_2);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData1 = mapper.readValue(testDoc1, Map.class);
@SuppressWarnings("unchecked") final Map<String, Object> expData2 = mapper.readValue(testDoc2, Map.class);

final List<Record<Event>> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2));
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null);
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null,
estimateBulkSizeUsingCompression, isRequestCompressionEnabled);
final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);

Expand Down Expand Up @@ -275,20 +280,24 @@ public void testOutputRawSpanDefault() throws IOException, InterruptedException
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(773.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(773.0, 0));
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 773.0 : 2058.0;
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
}

@Test
public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException {
@ParameterizedTest
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
public void testOutputRawSpanWithDLQ(final boolean estimateBulkSizeUsingCompression,
final boolean isRequestCompressionEnabled) throws IOException, InterruptedException {
// TODO: write test case
final String testDoc1 = readDocFromFile("raw-span-error.json");
final String testDoc2 = readDocFromFile(DEFAULT_RAW_SPAN_FILE_1);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData = mapper.readValue(testDoc2, Map.class);

final List<Record<Event>> testRecords = Arrays.asList(jsonStringToRecord(testDoc1), jsonStringToRecord(testDoc2));
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null);
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null,
estimateBulkSizeUsingCompression, isRequestCompressionEnabled);
// generate temporary directory for dlq file
final File tempDirectory = Files.createTempDirectory("").toFile();
// add dlq file path into setting
Expand Down Expand Up @@ -331,8 +340,9 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(1066.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(1066.0, 0));
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 1066.0 : 2072.0;
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));

}

Expand All @@ -355,14 +365,17 @@ public void testInstantiateSinkServiceMapDefault() throws IOException {
}
}

@Test
public void testOutputServiceMapDefault() throws IOException, InterruptedException {
@ParameterizedTest
@CsvSource({"true,true", "true,false", "false,true", "false,false"})
public void testOutputServiceMapDefault(final boolean estimateBulkSizeUsingCompression,
final boolean isRequestCompressionEnabled) throws IOException, InterruptedException {
final String testDoc = readDocFromFile(DEFAULT_SERVICE_MAP_FILE);
final ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked") final Map<String, Object> expData = mapper.readValue(testDoc, Map.class);

final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(testDoc));
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null);
final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_SERVICE_MAP.getValue(), null, null,
estimateBulkSizeUsingCompression, isRequestCompressionEnabled);
OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
sink.output(testRecords);
final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_SERVICE_MAP);
Expand All @@ -388,8 +401,9 @@ public void testOutputServiceMapDefault() throws IOException, InterruptedExcepti
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.size(), equalTo(3));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(0).getValue(), closeTo(1.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(366.0, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(366.0, 0));
final double expectedBulkRequestSizeBytes = isRequestCompressionEnabled && estimateBulkSizeUsingCompression ? 366.0 : 265.0;
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(1).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));
MatcherAssert.assertThat(bulkRequestSizeBytesMetrics.get(2).getValue(), closeTo(expectedBulkRequestSizeBytes, 0));

// Check restart for index already exists
sink = createObjectUnderTest(pluginSetting, true);
Expand Down Expand Up @@ -884,6 +898,15 @@ private PluginSetting generatePluginSetting(final String indexType, final String
return generatePluginSettingByMetadata(metadata);
}

private PluginSetting generatePluginSetting(final String indexType, final String indexAlias,
final String templateFilePath, final boolean estimateBulkSizeUsingCompression,
final boolean requestCompressionEnabled) {
final Map<String, Object> metadata = initializeConfigurationMetadata(indexType, indexAlias, templateFilePath);
metadata.put(IndexConfiguration.ESTIMATE_BULK_SIZE_USING_COMPRESSION, estimateBulkSizeUsingCompression);
metadata.put(ConnectionConfiguration.REQUEST_COMPRESSION_ENABLED, requestCompressionEnabled);
return generatePluginSettingByMetadata(metadata);
}

private PluginSetting generatePluginSetting(final String indexType, final String indexAlias,
final String templateType,
final String templateFilePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,14 @@ private void doInitializeInternal() throws IOException {
}
indexManager.setupIndex();

if (openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled()) {
final boolean isEstimateBulkSizeUsingCompression = openSearchSinkConfig.getIndexConfiguration().isEstimateBulkSizeUsingCompression();
final boolean isRequestCompressionEnabled = openSearchSinkConfig.getConnectionConfiguration().isRequestCompressionEnabled();
if (isEstimateBulkSizeUsingCompression && isRequestCompressionEnabled) {
bulkRequestSupplier = () -> new JavaClientAccumulatingCompressedBulkRequest(new BulkRequest.Builder(), bulkSize);
} else if (isEstimateBulkSizeUsingCompression) {
LOG.warn("Estimate bulk request size using compression was enabled but request compression is disabled. " +
"Estimating bulk request size without compression.");
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
} else {
bulkRequestSupplier = () -> new JavaClientAccumulatingUncompressedBulkRequest(new BulkRequest.Builder());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.sink.opensearch.bulk;

import com.google.common.annotations.VisibleForTesting;
import org.opensearch.dataprepper.plugins.sink.opensearch.BulkOperationWrapper;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.slf4j.Logger;
Expand All @@ -22,13 +23,16 @@
public class JavaClientAccumulatingCompressedBulkRequest implements AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> {
private static final Logger LOG = LoggerFactory.getLogger(JavaClientAccumulatingCompressedBulkRequest.class);

private static final int MAX_SAMPLE_TIMES = 2;

private final List<BulkOperationWrapper> bulkOperations;
private long sampleSize;
private final long targetBulkSize;
private BulkRequest.Builder bulkRequestBuilder;
private long currentBulkSize = 0L;
private double sampledOperationSize = 0.0;
private int operationCount = 0;
private int timesSampled = 0;
private BulkRequest builtRequest;

public JavaClientAccumulatingCompressedBulkRequest(final BulkRequest.Builder bulkRequestBuilder, final long targetBulkSize) {
Expand All @@ -39,6 +43,14 @@ public JavaClientAccumulatingCompressedBulkRequest(final BulkRequest.Builder bul
this.sampleSize = 10;
}

@VisibleForTesting
JavaClientAccumulatingCompressedBulkRequest(final BulkRequest.Builder bulkRequestBuilder, final long targetBulkSize, final int sampleSize) {
this.bulkRequestBuilder = bulkRequestBuilder;
bulkOperations = new ArrayList<>();
this.targetBulkSize = targetBulkSize;
this.sampleSize = sampleSize;
}

@Override
public long estimateSizeInBytesWithDocument(BulkOperationWrapper documentOrOperation) {
return currentBulkSize + (long) sampledOperationSize;
Expand All @@ -51,10 +63,11 @@ public void addOperation(BulkOperationWrapper bulkOperation) {
operationCount++;
bulkOperations.add(bulkOperation);

if (bulkOperations.size() == sampleSize) {
if (timesSampled < MAX_SAMPLE_TIMES && bulkOperations.size() == sampleSize) {
currentBulkSize = estimateBulkSize();
sampledOperationSize = (double) currentBulkSize / (double) bulkOperations.size();
updateTargetSampleSize();
timesSampled++;
} else {
currentBulkSize += sampledOperationSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.dataprepper.plugins.sink.opensearch.BulkOperationWrapper;
import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ public class IndexConfiguration {
public static final String NUM_SHARDS = "number_of_shards";
public static final String NUM_REPLICAS = "number_of_replicas";
public static final String BULK_SIZE = "bulk_size";
public static final String BULK_SAMPLE_SIZE = "bulk_sample_size";
public static final String ESTIMATE_BULK_SIZE_USING_COMPRESSION = "estimate_bulk_size_using_compression";
public static final String FLUSH_TIMEOUT = "flush_timeout";
public static final String DOCUMENT_ID_FIELD = "document_id_field";
public static final String ROUTING_FIELD = "routing_field";
public static final String ISM_POLICY_FILE = "ism_policy_file";
public static final long DEFAULT_BULK_SIZE = 5L;
public static final int DEFAULT_BULK_SAMPLE_SIZE = 5000;
public static final boolean DEFAULT_ESTIMATE_BULK_SIZE_USING_COMPRESSION = false;
public static final long DEFAULT_FLUSH_TIMEOUT = 60_000L;
public static final String ACTION = "action";
public static final String S3_AWS_REGION = "s3_aws_region";
Expand All @@ -59,7 +59,7 @@ public class IndexConfiguration {
private final String documentIdField;
private final String routingField;
private final long bulkSize;
private final int bulkSampleSize;
private final boolean estimateBulkSizeUsingCompression;
private final long flushTimeout;
private final Optional<String> ismPolicyFile;
private final String action;
Expand Down Expand Up @@ -106,7 +106,7 @@ private IndexConfiguration(final Builder builder) {
}
this.indexAlias = indexAlias;
this.bulkSize = builder.bulkSize;
this.bulkSampleSize = builder.bulkSampleSize;
this.estimateBulkSizeUsingCompression = builder.estimateBulkSizeUsingCompression;
this.flushTimeout = builder.flushTimeout;
this.routingField = builder.routingField;

Expand Down Expand Up @@ -157,8 +157,9 @@ public static IndexConfiguration readIndexConfig(final PluginSetting pluginSetti
builder = builder.withNumReplicas(pluginSetting.getIntegerOrDefault(NUM_REPLICAS, 0));
final Long batchSize = pluginSetting.getLongOrDefault(BULK_SIZE, DEFAULT_BULK_SIZE);
builder = builder.withBulkSize(batchSize);
final int bulkSampleSize = pluginSetting.getIntegerOrDefault(BULK_SAMPLE_SIZE, DEFAULT_BULK_SAMPLE_SIZE);
builder = builder.withBulkSampleSize(bulkSampleSize);
final boolean estimateBulkSizeUsingCompression =
pluginSetting.getBooleanOrDefault(ESTIMATE_BULK_SIZE_USING_COMPRESSION, DEFAULT_ESTIMATE_BULK_SIZE_USING_COMPRESSION);
builder = builder.withEstimateBulkSizeUsingCompression(estimateBulkSizeUsingCompression);
final long flushTimeout = pluginSetting.getLongOrDefault(FLUSH_TIMEOUT, DEFAULT_FLUSH_TIMEOUT);
builder = builder.withFlushTimeout(flushTimeout);
final String documentId = pluginSetting.getStringOrDefault(DOCUMENT_ID_FIELD, null);
Expand Down Expand Up @@ -227,11 +228,10 @@ public long getBulkSize() {
return bulkSize;
}

public int getBulkSampleSize() {
return bulkSampleSize;
public boolean isEstimateBulkSizeUsingCompression() {
return estimateBulkSizeUsingCompression;
}


public long getFlushTimeout() {
return flushTimeout;
}
Expand Down Expand Up @@ -320,7 +320,7 @@ public static class Builder {
private String routingField;
private String documentIdField;
private long bulkSize = DEFAULT_BULK_SIZE;
private int bulkSampleSize = DEFAULT_BULK_SAMPLE_SIZE;
private boolean estimateBulkSizeUsingCompression = DEFAULT_ESTIMATE_BULK_SIZE_USING_COMPRESSION;
private long flushTimeout = DEFAULT_FLUSH_TIMEOUT;
private Optional<String> ismPolicyFile;
private String action;
Expand Down Expand Up @@ -374,8 +374,8 @@ public Builder withBulkSize(final long bulkSize) {
return this;
}

public Builder withBulkSampleSize(final int bulkSampleSize) {
this.bulkSampleSize = bulkSampleSize;
public Builder withEstimateBulkSizeUsingCompression(final boolean estimateBulkSizeUsingCompression) {
this.estimateBulkSizeUsingCompression = estimateBulkSizeUsingCompression;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.closeTo;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -48,7 +47,7 @@ void setUp() {
}

private JavaClientAccumulatingCompressedBulkRequest createObjectUnderTest() {
return new JavaClientAccumulatingCompressedBulkRequest(bulkRequestBuilder, 1);
return new JavaClientAccumulatingCompressedBulkRequest(bulkRequestBuilder, 5 * 1024 * 1024, 1);
}

@Test
Expand Down Expand Up @@ -111,7 +110,8 @@ void getEstimatedSizeInBytes_returns_the_operation_overhead_if_requests_have_no_
objectUnderTest.addOperation(new BulkOperationWrapper(createBulkOperation(emptyDocument)));
}

assertThat((double) objectUnderTest.getEstimatedSizeInBytes(), closeTo(expectedDocumentSize, operationCount));
final long expectedSize = expectedDocumentSize * operationCount;
assertThat(objectUnderTest.getEstimatedSizeInBytes(), equalTo(expectedSize));
}

@Test
Expand Down
Loading

0 comments on commit 4c186d3

Please sign in to comment.