From 837e9d320df80ee4c1e42c896547ca09d68a381a Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Mon, 7 Oct 2024 17:13:54 -0700 Subject: [PATCH] Add retry to make sure source is not shutdown when exceptions are thrown on the main thread Signed-off-by: Souvik Bose --- .../kinesis/source/KinesisService.java | 27 ++++++++++++++++--- .../configuration/KinesisSourceConfig.java | 10 +++++++ .../kinesis/source/KinesisServiceTest.java | 2 ++ .../KinesisSourceConfigTest.java | 8 +++++- 4 files changed, 43 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java index 4ed15833f6..d18bbd9a8d 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java @@ -116,6 +116,11 @@ public void start(final Buffer> buffer) { public void shutDown() { LOG.info("Stop request received for Kinesis Source"); + if (scheduler == null) { + LOG.info("Scheduler not initialized!!"); + return; + } + Future gracefulShutdownFuture = scheduler.startGracefulShutdown(); LOG.info("Waiting up to {} seconds for shutdown to complete.", GRACEFUL_SHUTDOWN_WAIT_INTERVAL_SECONDS); try { @@ -128,8 +133,22 @@ public void shutDown() { } public Scheduler getScheduler(final Buffer> buffer) { - if (scheduler == null) { - return createScheduler(buffer); + + int numRetries = 0; + while (scheduler == null && numRetries++ < kinesisSourceConfig.getMaxInitializationAttempts()) { + try { + scheduler = createScheduler(buffer); + } catch (Exception ex) { + LOG.error("Caught exception when initializing KCL Scheduler. Will retry"); + } + + if (scheduler == null) { + try { + Thread.sleep(kinesisSourceConfig.getInitializationBackoffTime().toMillis()); + } catch (InterruptedException e){ + LOG.debug("Interrupted exception!"); + } + } } return scheduler; } @@ -158,7 +177,9 @@ public Scheduler createScheduler(final Buffer> buffer) { return new Scheduler( configsBuilder.checkpointConfig(), - configsBuilder.coordinatorConfig(), + configsBuilder.coordinatorConfig() + .schedulerInitializationBackoffTimeMillis(kinesisSourceConfig.getInitializationBackoffTime().toMillis()) + .maxInitializationAttempts(kinesisSourceConfig.getMaxInitializationAttempts()), configsBuilder.leaseManagementConfig().billingMode(BillingMode.PAY_PER_REQUEST), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java index 1414229813..64461fffc4 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfig.java @@ -24,6 +24,8 @@ public class KinesisSourceConfig { static final Duration DEFAULT_TIME_OUT_IN_MILLIS = Duration.ofMillis(1000); static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 100; static final Duration DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT = Duration.ofMinutes(10); + static final Duration DEFAULT_INITIALIZATION_BACKOFF_TIME = Duration.ofMillis(1000); + static final int DEFAULT_MAX_INITIALIZATION_ATTEMPTS = Integer.MAX_VALUE; @Getter @JsonProperty("streams") @@ -69,6 +71,14 @@ public class KinesisSourceConfig { public Duration getShardAcknowledgmentTimeout() { return shardAcknowledgmentTimeout; } + + @Getter + @JsonProperty("max_initialization_attempts") + private int maxInitializationAttempts = DEFAULT_MAX_INITIALIZATION_ATTEMPTS; + + @Getter + @JsonProperty("initialization_backoff_time") + private Duration initializationBackoffTime = DEFAULT_INITIALIZATION_BACKOFF_TIME; } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java index 12986d9969..2a95cba8c7 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java @@ -75,6 +75,7 @@ public class KinesisServiceTest { private static final int NUMBER_OF_RECORDS_TO_ACCUMULATE = 10; private static final int DEFAULT_MAX_RECORDS = 10000; private static final int IDLE_TIME_BETWEEN_READS_IN_MILLIS = 250; + private static final int DEFAULT_INITIALIZATION_ATTEMPTS = 10; private static final String awsAccountId = "123456789012"; private static final String streamArnFormat = "arn:aws:kinesis:us-east-1:%s:stream/%s"; private static final Instant streamCreationTime = Instant.now(); @@ -189,6 +190,7 @@ void setup() { streamConfigs.add(kinesisStreamConfig); when(kinesisSourceConfig.getStreams()).thenReturn(streamConfigs); when(kinesisSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(NUMBER_OF_RECORDS_TO_ACCUMULATE); + when(kinesisSourceConfig.getMaxInitializationAttempts()).thenReturn(DEFAULT_INITIALIZATION_ATTEMPTS); PluginModel pluginModel = mock(PluginModel.class); when(pluginModel.getPluginName()).thenReturn(codec_plugin_name); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java index 5846fe4b04..37cdc97f1f 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/configuration/KinesisSourceConfigTest.java @@ -75,6 +75,8 @@ void testSourceConfig() { assertThat(kinesisSourceConfig, notNullValue()); assertEquals(KinesisSourceConfig.DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, kinesisSourceConfig.getNumberOfRecordsToAccumulate()); assertEquals(KinesisSourceConfig.DEFAULT_TIME_OUT_IN_MILLIS, kinesisSourceConfig.getBufferTimeout()); + assertEquals(KinesisSourceConfig.DEFAULT_MAX_INITIALIZATION_ATTEMPTS, kinesisSourceConfig.getMaxInitializationAttempts()); + assertEquals(KinesisSourceConfig.DEFAULT_INITIALIZATION_BACKOFF_TIME, kinesisSourceConfig.getInitializationBackoffTime()); assertTrue(kinesisSourceConfig.isAcknowledgments()); assertEquals(KinesisSourceConfig.DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT, kinesisSourceConfig.getShardAcknowledgmentTimeout()); assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue()); @@ -104,6 +106,8 @@ void testSourceConfigWithStreamCodec() { assertThat(kinesisSourceConfig, notNullValue()); assertEquals(KinesisSourceConfig.DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, kinesisSourceConfig.getNumberOfRecordsToAccumulate()); assertEquals(KinesisSourceConfig.DEFAULT_TIME_OUT_IN_MILLIS, kinesisSourceConfig.getBufferTimeout()); + assertEquals(KinesisSourceConfig.DEFAULT_MAX_INITIALIZATION_ATTEMPTS, kinesisSourceConfig.getMaxInitializationAttempts()); + assertEquals(KinesisSourceConfig.DEFAULT_INITIALIZATION_BACKOFF_TIME, kinesisSourceConfig.getInitializationBackoffTime()); assertFalse(kinesisSourceConfig.isAcknowledgments()); assertEquals(KinesisSourceConfig.DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT, kinesisSourceConfig.getShardAcknowledgmentTimeout()); assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue()); @@ -134,6 +138,8 @@ void testSourceConfigWithInitialPosition() { assertThat(kinesisSourceConfig, notNullValue()); assertEquals(KinesisSourceConfig.DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, kinesisSourceConfig.getNumberOfRecordsToAccumulate()); assertEquals(KinesisSourceConfig.DEFAULT_TIME_OUT_IN_MILLIS, kinesisSourceConfig.getBufferTimeout()); + assertEquals(KinesisSourceConfig.DEFAULT_MAX_INITIALIZATION_ATTEMPTS, kinesisSourceConfig.getMaxInitializationAttempts()); + assertEquals(KinesisSourceConfig.DEFAULT_INITIALIZATION_BACKOFF_TIME, kinesisSourceConfig.getInitializationBackoffTime()); assertFalse(kinesisSourceConfig.isAcknowledgments()); assertEquals(KinesisSourceConfig.DEFAULT_SHARD_ACKNOWLEDGEMENT_TIMEOUT, kinesisSourceConfig.getShardAcknowledgmentTimeout()); assertThat(kinesisSourceConfig.getAwsAuthenticationConfig(), notNullValue()); @@ -158,4 +164,4 @@ void testSourceConfigWithInitialPosition() { assertEquals(kinesisStreamConfig.getCheckPointInterval(), expectedCheckpointIntervals.get(kinesisStreamConfig.getName())); } } -} +} \ No newline at end of file