From 37eea76e197ee9a40481dc3f0df02708da8312ba Mon Sep 17 00:00:00 2001 From: Chase <62891993+engechas@users.noreply.github.com> Date: Mon, 9 Oct 2023 13:40:02 -0500 Subject: [PATCH] Kafka drain timeout (#3454) * Add getDrainTimeout method to buffer interface. Add as configurable value for kafka buffer Signed-off-by: Chase Engelbrecht * Add unit tests Signed-off-by: Chase Engelbrecht * Move getDrainTimeout to default method in the interface, add test for it, disable SNS sink Signed-off-by: Chase Engelbrecht * Remove verification from non-mock Signed-off-by: Chase Engelbrecht --------- Signed-off-by: Chase Engelbrecht --- .../dataprepper/model/buffer/Buffer.java | 5 ++ .../dataprepper/model/buffer/BufferTest.java | 25 ++++++++++ .../parser/CircuitBreakingBuffer.java | 6 +++ .../dataprepper/pipeline/Pipeline.java | 8 +-- .../plugins/MultiBufferDecorator.java | 9 ++++ .../parser/CircuitBreakingBufferTest.java | 11 +++++ .../plugins/MultiBufferDecoratorTest.java | 49 ++++++++++++++++++- .../plugins/buffer/TestBuffer.java | 1 + .../plugins/kafka/buffer/KafkaBuffer.java | 9 ++++ .../configuration/KafkaBufferConfig.java | 9 ++++ .../plugins/kafka/buffer/KafkaBufferTest.java | 13 +++++ settings.gradle | 2 +- 12 files changed, 141 insertions(+), 6 deletions(-) create mode 100644 data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java index d77d99d14c..50c1085bd7 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.record.Record; +import java.time.Duration; import java.util.Collection; import java.util.Map; import java.util.concurrent.TimeoutException; @@ -53,4 +54,8 @@ public interface Buffer> { void checkpoint(CheckpointState checkpointState); boolean isEmpty(); + + default Duration getDrainTimeout() { + return Duration.ZERO; + } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java new file mode 100644 index 0000000000..293a9d67bb --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.buffer; + +import org.junit.Assert; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; + +import java.time.Duration; + +import static org.mockito.Mockito.spy; + +public class BufferTest { + + @Test + public void testGetDrainTimeout() { + final Buffer> buffer = spy(Buffer.class); + + Assert.assertEquals(Duration.ZERO, buffer.getDrainTimeout()); + } +} diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/CircuitBreakingBuffer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/CircuitBreakingBuffer.java index c546bce5f8..403149f8d9 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/CircuitBreakingBuffer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/CircuitBreakingBuffer.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; +import java.time.Duration; import java.util.Collection; import java.util.Map; import java.util.concurrent.TimeoutException; @@ -71,4 +72,9 @@ public void checkpoint(final CheckpointState checkpointState) { public boolean isEmpty() { return buffer.isEmpty(); } + + @Override + public Duration getDrainTimeout() { + return buffer.getDrainTimeout(); + } } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java index f41ab33d7c..b60257d860 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/Pipeline.java @@ -266,9 +266,9 @@ public void execute() { * 6. Stopping the sink ExecutorService */ public synchronized void shutdown() { - LOG.info("Pipeline [{}] - Received shutdown signal with processor shutdown timeout {} and sink shutdown timeout {}." + - " Initiating the shutdown process", - name, processorShutdownTimeout, sinkShutdownTimeout); + LOG.info("Pipeline [{}] - Received shutdown signal with buffer drain timeout {}, processor shutdown timeout {}, " + + "and sink shutdown timeout {}. Initiating the shutdown process", + name, buffer.getDrainTimeout(), processorShutdownTimeout, sinkShutdownTimeout); try { source.stop(); stopRequested.set(true); @@ -277,7 +277,7 @@ public synchronized void shutdown() { "proceeding with termination of process workers", name, ex); } - shutdownExecutorService(processorExecutorService, processorShutdownTimeout.toMillis(), "processor"); + shutdownExecutorService(processorExecutorService, buffer.getDrainTimeout().toMillis() + processorShutdownTimeout.toMillis(), "processor"); processorSets.forEach(processorSet -> processorSet.forEach(Processor::shutdown)); sinks.stream() diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugins/MultiBufferDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugins/MultiBufferDecorator.java index 22cdf2a519..f7da240e5d 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugins/MultiBufferDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/plugins/MultiBufferDecorator.java @@ -9,10 +9,12 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; /** * Buffer decorator created for pipelines that make use of multiple buffers, such as PeerForwarder-enabled pipelines. The decorator @@ -55,4 +57,11 @@ public boolean isEmpty() { .map(Buffer::isEmpty) .allMatch(result -> result == true); } + + @Override + public Duration getDrainTimeout() { + return Stream.concat(Stream.of(primaryBuffer), secondaryBuffers.stream()) + .map(Buffer::getDrainTimeout) + .reduce(Duration.ZERO, Duration::plus); + } } \ No newline at end of file diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/CircuitBreakingBufferTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/CircuitBreakingBufferTest.java index 0b9df89428..866f13aac2 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/CircuitBreakingBufferTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/CircuitBreakingBufferTest.java @@ -19,10 +19,12 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeoutException; import static org.hamcrest.MatcherAssert.assertThat; @@ -64,6 +66,15 @@ void constructor_should_throw_with_null_circuitBreaker() { assertThrows(NullPointerException.class, this::createObjectUnderTest); } + @Test + void getDrainTimeout_returns_buffer_drain_timeout() { + final Duration duration = Duration.ofMillis(new Random().nextLong()); + when(buffer.getDrainTimeout()).thenReturn(duration); + + final Duration result = createObjectUnderTest().getDrainTimeout(); + assertThat(result, equalTo(duration)); + } + @Nested class NoCircuitBreakerChecks { @AfterEach diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/MultiBufferDecoratorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/MultiBufferDecoratorTest.java index 3ceda2e08e..028e7dcf8a 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/MultiBufferDecoratorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/plugins/MultiBufferDecoratorTest.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.record.Record; +import java.time.Duration; import java.util.AbstractMap; import java.util.Collection; import java.util.List; @@ -34,7 +35,8 @@ @ExtendWith(MockitoExtension.class) public class MultiBufferDecoratorTest { - private static final int TIMEOUT_MILLIS = new Random().nextInt(1000000) + 1; + private static final Random RANDOM = new Random(); + private static final int TIMEOUT_MILLIS = RANDOM.nextInt(1000000) + 1; @Mock private Buffer primaryBuffer; @@ -154,6 +156,51 @@ void isEmpty_MultipleSecondaryBuffers_OneNotEmpty() { verify(secondaryBuffer, times(2)).isEmpty(); } + @Test + void getDrainTimeout_NoSecondaryBuffers_ReturnsPrimaryBufferValue() { + final Duration primaryBufferDrainTimeout = Duration.ofMillis(RANDOM.nextLong()); + when(primaryBuffer.getDrainTimeout()).thenReturn(primaryBufferDrainTimeout); + final MultiBufferDecorator multiBufferDecorator = createObjectUnderTest(0); + + final Duration result = multiBufferDecorator.getDrainTimeout(); + assertThat(result, equalTo(primaryBufferDrainTimeout)); + + verify(primaryBuffer).getDrainTimeout(); + } + + @Test + void getDrainTimeout_OneSecondaryBuffer_ReturnsSumOfDurations() { + final Duration primaryBufferDrainTimeout = Duration.ofMillis(RANDOM.nextLong()); + when(primaryBuffer.getDrainTimeout()).thenReturn(primaryBufferDrainTimeout); + final Duration secondaryBufferDrainTimeout = Duration.ofMillis(RANDOM.nextLong()); + when(secondaryBuffer.getDrainTimeout()).thenReturn(secondaryBufferDrainTimeout); + final MultiBufferDecorator multiBufferDecorator = createObjectUnderTest(1); + + final Duration result = multiBufferDecorator.getDrainTimeout(); + assertThat(result, equalTo(primaryBufferDrainTimeout.plus(secondaryBufferDrainTimeout))); + + verify(primaryBuffer).getDrainTimeout(); + verify(secondaryBuffer).getDrainTimeout(); + } + + @Test + void getDrainTimeout_MultipleSecondaryBuffers_ReturnsSumOfDurations() { + final Duration primaryBufferDrainTimeout = Duration.ofMillis(RANDOM.nextLong()); + when(primaryBuffer.getDrainTimeout()).thenReturn(primaryBufferDrainTimeout); + final Duration secondaryBufferDrainTimeout1 = Duration.ofMillis(RANDOM.nextLong()); + final Duration secondaryBufferDrainTimeout2 = Duration.ofMillis(RANDOM.nextLong()); + when(secondaryBuffer.getDrainTimeout()) + .thenReturn(secondaryBufferDrainTimeout1) + .thenReturn(secondaryBufferDrainTimeout2); + final MultiBufferDecorator multiBufferDecorator = createObjectUnderTest(2); + + final Duration result = multiBufferDecorator.getDrainTimeout(); + assertThat(result, equalTo(primaryBufferDrainTimeout.plus(secondaryBufferDrainTimeout1).plus(secondaryBufferDrainTimeout2))); + + verify(primaryBuffer).getDrainTimeout(); + verify(secondaryBuffer, times(2)).getDrainTimeout(); + } + private MultiBufferDecorator createObjectUnderTest(final int secondaryBufferCount) { final List secondaryBuffers = IntStream.range(0, secondaryBufferCount) .mapToObj(i -> secondaryBuffer) diff --git a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/buffer/TestBuffer.java b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/buffer/TestBuffer.java index 0e4377d903..9cf365c2e7 100644 --- a/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/buffer/TestBuffer.java +++ b/data-prepper-plugins/common/src/test/java/org/opensearch/dataprepper/plugins/buffer/TestBuffer.java @@ -91,4 +91,5 @@ public boolean isEmpty() { public int size() { return buffer.size(); } + } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index c793041545..158cc8bee8 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -19,6 +19,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; @@ -36,6 +37,7 @@ public class KafkaBuffer> extends AbstractBuffer { private final KafkaCustomProducer producer; private final AbstractBuffer innerBuffer; private final ExecutorService executorService; + private final Duration drainTimeout; @DataPrepperPluginConstructor public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig, final PluginFactory pluginFactory, @@ -49,6 +51,8 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka innerBuffer, pluginMetrics, acknowledgementSetManager, new AtomicBoolean(false)); this.executorService = Executors.newFixedThreadPool(consumers.size()); consumers.forEach(this.executorService::submit); + + this.drainTimeout = kafkaBufferConfig.getDrainTimeout(); } @Override @@ -88,4 +92,9 @@ public boolean isEmpty() { // TODO: check Kafka topic is empty as well. return innerBuffer.isEmpty(); } + + @Override + public Duration getDrainTimeout() { + return drainTimeout; + } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java index 5dd1f68b42..1cab8d7133 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaBufferConfig.java @@ -7,11 +7,13 @@ import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; +import java.time.Duration; import java.util.List; import java.util.Objects; import java.util.Optional; public class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConfig { + private static final Duration DEFAULT_DRAIN_TIMEOUT = Duration.ofSeconds(30); @JsonProperty("bootstrap_servers") private List bootStrapServers; @@ -39,6 +41,9 @@ public class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConf @Valid private AwsConfig awsConfig; + @JsonProperty("drain_timeout") + private Duration drainTimeout = DEFAULT_DRAIN_TIMEOUT; + public List getBootstrapServers() { if (Objects.nonNull(bootStrapServers)) { @@ -113,4 +118,8 @@ public String getClientDnsLookup() { public boolean getAcknowledgementsEnabled() { return false; } + + public Duration getDrainTimeout() { + return drainTimeout; + } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java index fd304d810d..37a9c6e8bf 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java @@ -33,12 +33,15 @@ import java.util.Arrays; import java.util.Collections; import java.util.Objects; +import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeoutException; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -218,5 +221,15 @@ void test_kafkaBuffer_postProcess() { } + @Test + void test_kafkaBuffer_getDrainTimeout() { + final Duration duration = Duration.ofMillis(new Random().nextLong()); + when(bufferConfig.getDrainTimeout()).thenReturn(duration); + kafkaBuffer = createObjectUnderTest(); + final Duration result = kafkaBuffer.getDrainTimeout(); + assertThat(result, equalTo(duration)); + + verify(bufferConfig).getDrainTimeout(); + } } \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 03989827d0..e7963c1670 100644 --- a/settings.gradle +++ b/settings.gradle @@ -141,7 +141,7 @@ include 'data-prepper-plugins:buffer-common' include 'data-prepper-plugins:sqs-source' include 'data-prepper-plugins:cloudwatch-logs' include 'data-prepper-plugins:http-sink' -include 'data-prepper-plugins:sns-sink' +//include 'data-prepper-plugins:sns-sink' include 'data-prepper-plugins:prometheus-sink' include 'data-prepper-plugins:dissect-processor' include 'data-prepper-plugins:dynamodb-source'