From ce1154b7c54e67154b2f2edc81c12d0ebd40c90c Mon Sep 17 00:00:00 2001 From: Kunal Bhatnagar Date: Mon, 2 Sep 2024 13:32:06 +0200 Subject: [PATCH 01/17] DBZ-8193 - Added batch write to Aws Kinesis --- .../server/kinesis/KinesisChangeConsumer.java | 92 +++++++++++++++++-- 1 file changed, 85 insertions(+), 7 deletions(-) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index d5ac963c..4c174734 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -7,6 +7,7 @@ import java.net.URI; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -39,6 +40,10 @@ import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry; /** * Implementation of the consumer that delivers the messages into Amazon Kinesis destination. @@ -61,6 +66,7 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu private Optional endpointOverride; private Optional credentialsProfile; private static final int DEFAULT_RETRIES = 5; + private static final int batchSize = 500; private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1); @ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") @@ -106,22 +112,94 @@ void close() { @Override public void handleBatch(List> records, RecordCommitter> committer) throws InterruptedException { - for (ChangeEvent record : records) { - LOGGER.trace("Received event '{}'", record); + // Split the records into batches of size 500 + String streamName = records.get(0).destination(); + List>> batchRecords = batchList(records, batchSize); + // Process each batch to PutRecordsRequestEntry + for (List> batch : batchRecords) { + List putRecordsRequestEntryList = new ArrayList<>(); + for (ChangeEvent record : batch) { + + Object rv = record.value(); + if (rv == null) { + rv = ""; + } + PutRecordsRequestEntry putRecordsRequestEntry = PutRecordsRequestEntry.builder().partitionKey((record.key() != null) ? getString(record.key()) : nullKey) + .data(SdkBytes.fromByteArray(getBytes(rv))).build(); + putRecordsRequestEntryList.add(putRecordsRequestEntry); + } + + // Handle Error + boolean notSuccesfull = true; int attempts = 0; - while (!recordSent(record)) { - attempts++; + List batchRequest = putRecordsRequestEntryList; + + while (notSuccesfull) { + if (attempts >= DEFAULT_RETRIES) { - throw new DebeziumException("Exceeded maximum number of attempts to publish event " + record); + throw new DebeziumException("Exceeded maximum number of attempts to publish event"); + } + + try { + PutRecordsResponse response = recordsSent(batchRequest, streamName); + attempts++; + if (response.failedRecordCount() > 0) { + Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause(); + + final List putRecordsResults = response.records(); + List failedRecordsList = new ArrayList<>(); + + for (int index = 0; index < putRecordsResults.size(); index++) { + PutRecordsResultEntry entryResult = putRecordsResults.get(index); + if (entryResult.errorCode() != null) { + failedRecordsList.add(putRecordsRequestEntryList.get(index)); + } + } + batchRequest = failedRecordsList; + + } + else { + notSuccesfull = false; + } + } - Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause(); + catch (KinesisException exception) { + LOGGER.warn("Failed to send record to {}", streamName, exception); + attempts++; + Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause(); + } + } + + for (ChangeEvent record : batch) { + committer.markProcessed(record); } - committer.markProcessed(record); } + + // Mark Batch Finished committer.markBatchFinished(); } + private List>> batchList(List> inputList, final int maxSize) { + List>> batches = new ArrayList<>(); + final int size = inputList.size(); + for (int i = 0; i < size; i += maxSize) { + batches.add(new ArrayList<>(inputList.subList(i, Math.min(size, i + maxSize)))); + } + return batches; + } + + private PutRecordsResponse recordsSent(List putRecordsRequestEntryList, String streamName) { + + // Create a PutRecordsRequest + PutRecordsRequest putRecordsRequest = PutRecordsRequest.builder().streamName(streamNameMapper.map(streamName)).records(putRecordsRequestEntryList).build(); + + // Send Request + PutRecordsResponse putRecordsResponse = client.putRecords(putRecordsRequest); + LOGGER.trace("Response Receieved: " + putRecordsResponse); + return putRecordsResponse; + } + private boolean recordSent(ChangeEvent record) { Object rv = record.value(); if (rv == null) { From 3035ce142da0d5d105f157d15f7059b32d720f3d Mon Sep 17 00:00:00 2001 From: Kunal Bhatnagar Date: Mon, 2 Sep 2024 13:32:18 +0200 Subject: [PATCH 02/17] DBZ-8193 - Added Unittest --- .../server/kinesis/KinesisUnitTest.java | 213 ++++++++++++++++++ 1 file changed, 213 insertions(+) create mode 100644 debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java diff --git a/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java b/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java new file mode 100644 index 00000000..58e0f8c2 --- /dev/null +++ b/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java @@ -0,0 +1,213 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server.kinesis; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import jakarta.enterprise.inject.Instance; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine.RecordCommitter; +import io.debezium.engine.Header; +import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.KinesisException; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry; + +@QuarkusTest +@QuarkusTestResource(PostgresTestResourceLifecycleManager.class) +public class KinesisUnitTest { + + private KinesisChangeConsumer kinesisChangeConsumer; + private KinesisClient spyClient; + private AtomicInteger counter; + private AtomicBoolean threwException; + List> changeEvents; + RecordCommitter> committer; + + @BeforeEach + public void setup() { + counter = new AtomicInteger(0); + threwException = new AtomicBoolean(false); + changeEvents = createChangeEvents(500); + committer = RecordCommitter(); + spyClient = spy(KinesisClient.builder().region(Region.of(KinesisTestConfigSource.KINESIS_REGION)) + .credentialsProvider(ProfileCredentialsProvider.create("default")).build()); + + Instance mockInstance = mock(Instance.class); + when(mockInstance.isResolvable()).thenReturn(true); + when(mockInstance.get()).thenReturn(spyClient); + + kinesisChangeConsumer = new KinesisChangeConsumer(); + kinesisChangeConsumer.customClient = mockInstance; + } + + @AfterEach + public void tearDown() { + reset(spyClient); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private static List> createChangeEvents(int size) { + List> changeEvents = new ArrayList<>(); + for (int i = 0; i < size; i++) { + ChangeEvent result = mock(ChangeEvent.class); + when(result.key()).thenReturn("key"); + when(result.value()).thenReturn(Integer.toString(i)); + when(result.destination()).thenReturn("dest"); + Header header = mock(Header.class); + when(header.getKey()).thenReturn("h1Key"); + when(header.getValue()).thenReturn("h1Value"); + when(result.headers()).thenReturn(List.of(header)); + changeEvents.add(result); + } + return changeEvents; + } + + @SuppressWarnings({ "unchecked" }) + private static RecordCommitter> RecordCommitter() { + RecordCommitter> result = mock(RecordCommitter.class); + return result; + } + + // 1. Test that continous sending of Kinesis response containing error yields exception after 5 attempts + @Test + public void testValidResponseWithErrorCode() throws Exception { + // Arrange + doAnswer(invocation -> { + PutRecordsRequest request = invocation.getArgument(0); + List records = request.records(); + counter.incrementAndGet(); + List failedEntries = records.stream().map(record -> PutRecordsResultEntry.builder().errorCode("ProvisionedThroughputExceededException") + .errorMessage("The request rate for the stream is too high").build()).collect(Collectors.toList()); + + return PutRecordsResponse.builder().failedRecordCount(records.size()).records(failedEntries).build(); + }).when(spyClient).putRecords(any(PutRecordsRequest.class)); + + // Act + try { + kinesisChangeConsumer.connect(); + kinesisChangeConsumer.handleBatch(changeEvents, RecordCommitter()); + } + catch (Exception e) { + threwException.getAndSet(true); + } + + // Assert + assertTrue(threwException.get()); + // DEFAULT_RETRIES is 5 times + assertEquals(5, counter.get()); + } + + // 2. Test that continous return of exception yields Debezium exception after 5 attempts + @Test + public void testExceptionWhileWritingData() throws Exception { + // Arrange + doAnswer(invocation -> { + counter.incrementAndGet(); + throw KinesisException.builder().message("Kinesis Exception").build(); + }).when(spyClient).putRecords(any(PutRecordsRequest.class)); + + // Act + try { + kinesisChangeConsumer.connect(); + kinesisChangeConsumer.handleBatch(changeEvents, committer); + } + catch (Exception e) { + threwException.getAndSet(true); + } + + // Assert + assertTrue(threwException.get()); + // DEFAULT_RETRIES is 5 times + assertEquals(5, counter.get()); + } + + // 3. Test that only failed records are re-sent + @Test + public void testResendFailedRecords() throws Exception { + // Arrange + AtomicBoolean firstCall = new AtomicBoolean(true); + List failedRecordsFromFirstCall = new ArrayList<>(); + List recordsFromSecondCall = new ArrayList<>(); + doAnswer(invocation -> { + List response = new ArrayList<>(); + PutRecordsRequest request = invocation.getArgument(0); + List records = request.records(); + counter.incrementAndGet(); + + if (firstCall.get()) { + int failedEntries = 100; + for (int i = 0; i < records.size(); i++) { + PutRecordsResultEntry recordResult; + if (i < failedEntries) { + recordResult = PutRecordsResultEntry.builder().errorCode("ProvisionedThroughputExceededException") + .errorMessage("The request rate for the stream is too high").build(); + + failedRecordsFromFirstCall.add(records.get(i)); + } + else { + recordResult = PutRecordsResultEntry.builder().shardId("shardId").sequenceNumber("sequenceNumber").build(); + } + response.add(recordResult); + } + firstCall.getAndSet(false); + return PutRecordsResponse.builder().failedRecordCount(failedEntries).records(response).build(); + } + else { + for (PutRecordsRequestEntry record : records) { + recordsFromSecondCall.add(record); + PutRecordsResultEntry recordResult = PutRecordsResultEntry.builder().shardId("shardId").sequenceNumber("sequenceNumber").build(); + response.add(recordResult); + } + return PutRecordsResponse.builder().failedRecordCount(0).records(response).build(); + } + }).when(spyClient).putRecords(any(PutRecordsRequest.class)); + + // Act + try { + kinesisChangeConsumer.connect(); + kinesisChangeConsumer.handleBatch(changeEvents, committer); + } + catch (Exception e) { + threwException.getAndSet(true); + } + + // Assert + assertFalse(threwException.get()); + assertEquals(2, counter.get()); + assertEquals(recordsFromSecondCall.size(), failedRecordsFromFirstCall.size()); + for (int i = 0; i < recordsFromSecondCall.size(); i++) { + assertEquals(failedRecordsFromFirstCall.get(i).data(), recordsFromSecondCall.get(i).data()); + } + } +} From 769fafe93d963336660c1e90e8284f51f421bdb2 Mon Sep 17 00:00:00 2001 From: Kubha99 <1kunalbhatnagar@gmail.com> Date: Sun, 15 Sep 2024 16:31:27 +0200 Subject: [PATCH 03/17] DBZ-8193: Changed spell check, comment on PR --- .../io/debezium/server/kinesis/KinesisChangeConsumer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index 4c174734..88551c88 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -131,11 +131,11 @@ public void handleBatch(List> records, RecordCommitt } // Handle Error - boolean notSuccesfull = true; + boolean notSuccesful = true; int attempts = 0; List batchRequest = putRecordsRequestEntryList; - while (notSuccesfull) { + while (notSuccesful) { if (attempts >= DEFAULT_RETRIES) { throw new DebeziumException("Exceeded maximum number of attempts to publish event"); @@ -160,7 +160,7 @@ public void handleBatch(List> records, RecordCommitt } else { - notSuccesfull = false; + notSuccesful = false; } } From 65c8204256bbf2f442958485c8590ba5a7bc4e6a Mon Sep 17 00:00:00 2001 From: Kubha99 <1kunalbhatnagar@gmail.com> Date: Sun, 15 Sep 2024 16:36:04 +0200 Subject: [PATCH 04/17] DBZ-8193: Removed unused method, comment on PR --- .../server/kinesis/KinesisChangeConsumer.java | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index 88551c88..dca01b59 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -199,26 +199,4 @@ private PutRecordsResponse recordsSent(List putRecordsRe LOGGER.trace("Response Receieved: " + putRecordsResponse); return putRecordsResponse; } - - private boolean recordSent(ChangeEvent record) { - Object rv = record.value(); - if (rv == null) { - rv = ""; - } - - final PutRecordRequest putRecord = PutRecordRequest.builder() - .partitionKey((record.key() != null) ? getString(record.key()) : nullKey) - .streamName(streamNameMapper.map(record.destination())) - .data(SdkBytes.fromByteArray(getBytes(rv))) - .build(); - - try { - client.putRecord(putRecord); - return true; - } - catch (KinesisException exception) { - LOGGER.warn("Failed to send record to {}", record.destination(), exception); - return false; - } - } } From 36fe2b2988d6f8f94ce608276d5fffcaba9341d4 Mon Sep 17 00:00:00 2001 From: Kubha99 <1kunalbhatnagar@gmail.com> Date: Sun, 15 Sep 2024 17:41:20 +0200 Subject: [PATCH 05/17] DBZ-8193: Make varaibles configurable, comment from PR --- .../server/kinesis/KinesisChangeConsumer.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index dca01b59..3084299d 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -61,17 +61,24 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu private static final String PROP_REGION_NAME = PROP_PREFIX + "region"; private static final String PROP_ENDPOINT_NAME = PROP_PREFIX + "endpoint"; private static final String PROP_CREDENTIALS_PROFILE = PROP_PREFIX + "credentials.profile"; + private static final String PROP_BATCH_SIZE = PROP_PREFIX + "batch.size"; + private static final String PROP_DEFAULT_RETRIES = PROP_PREFIX + "default.retries"; + private String region; private Optional endpointOverride; private Optional credentialsProfile; - private static final int DEFAULT_RETRIES = 5; - private static final int batchSize = 500; private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1); @ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") String nullKey; + @ConfigProperty(name = PROP_BATCH_SIZE, defaultValue = "500") + int batchSize; + + @ConfigProperty(name = PROP_DEFAULT_RETRIES, defaultValue = "5") + int defaultRetries; + private KinesisClient client = null; @Inject @@ -145,6 +152,7 @@ public void handleBatch(List> records, RecordCommitt PutRecordsResponse response = recordsSent(batchRequest, streamName); attempts++; if (response.failedRecordCount() > 0) { + LOGGER.warn("Failed to send {} number of records, retrying", response.failedRecordCount()); Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause(); final List putRecordsResults = response.records(); From 3117cb58ec66aa2951ca9ad31176c9249fc762ea Mon Sep 17 00:00:00 2001 From: Kubha99 <1kunalbhatnagar@gmail.com> Date: Sun, 15 Sep 2024 18:16:03 +0200 Subject: [PATCH 06/17] DBZ-8193: Fixed typo in and renamed DEFAULT_RETRIES --- .../debezium/server/kinesis/KinesisChangeConsumer.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index 3084299d..50c25ca3 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -39,7 +39,6 @@ import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; import software.amazon.awssdk.services.kinesis.model.KinesisException; -import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; @@ -62,8 +61,7 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu private static final String PROP_ENDPOINT_NAME = PROP_PREFIX + "endpoint"; private static final String PROP_CREDENTIALS_PROFILE = PROP_PREFIX + "credentials.profile"; private static final String PROP_BATCH_SIZE = PROP_PREFIX + "batch.size"; - private static final String PROP_DEFAULT_RETRIES = PROP_PREFIX + "default.retries"; - + private static final String PROP_RETRIES = PROP_PREFIX + "default.retries"; private String region; private Optional endpointOverride; @@ -76,8 +74,8 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu @ConfigProperty(name = PROP_BATCH_SIZE, defaultValue = "500") int batchSize; - @ConfigProperty(name = PROP_DEFAULT_RETRIES, defaultValue = "5") - int defaultRetries; + @ConfigProperty(name = PROP_RETRIES, defaultValue = "5") + int RETRIES; private KinesisClient client = null; @@ -144,7 +142,7 @@ public void handleBatch(List> records, RecordCommitt while (notSuccesful) { - if (attempts >= DEFAULT_RETRIES) { + if (attempts >= RETRIES) { throw new DebeziumException("Exceeded maximum number of attempts to publish event"); } From 907a5902cba375ad6666fe9fd85e9d46e59f504e Mon Sep 17 00:00:00 2001 From: Kubha99 <1kunalbhatnagar@gmail.com> Date: Sun, 15 Sep 2024 18:29:57 +0200 Subject: [PATCH 07/17] DBZ-8193: Segment records by destination and then batches, comment from PR --- .../server/kinesis/KinesisChangeConsumer.java | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index 50c25ca3..e25d180d 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -8,7 +8,9 @@ import java.net.URI; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import jakarta.annotation.PostConstruct; @@ -119,11 +121,12 @@ public void handleBatch(List> records, RecordCommitt throws InterruptedException { // Split the records into batches of size 500 - String streamName = records.get(0).destination(); - List>> batchRecords = batchList(records, batchSize); + String streamName; + List>> batchRecords = createBatches(records, batchSize); // Process each batch to PutRecordsRequestEntry for (List> batch : batchRecords) { List putRecordsRequestEntryList = new ArrayList<>(); + streamName = batch.get(0).destination(); for (ChangeEvent record : batch) { Object rv = record.value(); @@ -186,13 +189,25 @@ public void handleBatch(List> records, RecordCommitt committer.markBatchFinished(); } - private List>> batchList(List> inputList, final int maxSize) { - List>> batches = new ArrayList<>(); - final int size = inputList.size(); - for (int i = 0; i < size; i += maxSize) { - batches.add(new ArrayList<>(inputList.subList(i, Math.min(size, i + maxSize)))); + private List>> createBatches(List> records, final int maxSize) { + Map>> segmentedRecords = new HashMap<>(); + + // Segment the records by destination + for (ChangeEvent record : records) { + String destination = record.destination(); + segmentedRecords.computeIfAbsent(destination, k -> new ArrayList<>()).add(record); + } + + List>> batchedRecords = new ArrayList<>(); + + // Divide each segment into batches of the specified size + for (List> segment : segmentedRecords.values()) { + for (int i = 0; i < segment.size(); i += batchSize) { + batchedRecords.add(segment.subList(i, Math.min(i + batchSize, segment.size()))); + } } - return batches; + + return batchedRecords; } private PutRecordsResponse recordsSent(List putRecordsRequestEntryList, String streamName) { From 0e8260b1ce6e53e4e77e9cf748744ab287d3cc5f Mon Sep 17 00:00:00 2001 From: Kubha99 <1kunalbhatnagar@gmail.com> Date: Sun, 15 Sep 2024 18:32:18 +0200 Subject: [PATCH 08/17] DBZ-8193: Guard if list of records is empty, comment from PR --- .../io/debezium/server/kinesis/KinesisChangeConsumer.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index e25d180d..85c7f3d2 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -120,6 +120,12 @@ void close() { public void handleBatch(List> records, RecordCommitter> committer) throws InterruptedException { + // Guard if records are empty + if (records.isEmpty()) { + committer.markBatchFinished(); + return; + } + // Split the records into batches of size 500 String streamName; List>> batchRecords = createBatches(records, batchSize); From 79424b22337ac8ec6b9712fa1dd066dd717e903d Mon Sep 17 00:00:00 2001 From: Kubha99 <1kunalbhatnagar@gmail.com> Date: Mon, 23 Sep 2024 19:31:44 +0200 Subject: [PATCH 09/17] . --- .../server/kinesis/KinesisChangeConsumer.java | 9 ++-- .../server/kinesis/KinesisUnitTest.java | 44 +++++++++++++++++-- 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index 85c7f3d2..5db586c3 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -8,10 +8,10 @@ import java.net.URI; import java.time.Duration; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -129,6 +129,7 @@ public void handleBatch(List> records, RecordCommitt // Split the records into batches of size 500 String streamName; List>> batchRecords = createBatches(records, batchSize); + // Process each batch to PutRecordsRequestEntry for (List> batch : batchRecords) { List putRecordsRequestEntryList = new ArrayList<>(); @@ -196,13 +197,9 @@ public void handleBatch(List> records, RecordCommitt } private List>> createBatches(List> records, final int maxSize) { - Map>> segmentedRecords = new HashMap<>(); // Segment the records by destination - for (ChangeEvent record : records) { - String destination = record.destination(); - segmentedRecords.computeIfAbsent(destination, k -> new ArrayList<>()).add(record); - } + Map>> segmentedRecords = records.stream().collect(Collectors.groupingBy(record -> record.destination())); List>> batchedRecords = new ArrayList<>(); diff --git a/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java b/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java index 58e0f8c2..d80e947a 100644 --- a/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java +++ b/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java @@ -58,7 +58,7 @@ public class KinesisUnitTest { public void setup() { counter = new AtomicInteger(0); threwException = new AtomicBoolean(false); - changeEvents = createChangeEvents(500); + changeEvents = createChangeEvents(500, "destination"); committer = RecordCommitter(); spyClient = spy(KinesisClient.builder().region(Region.of(KinesisTestConfigSource.KINESIS_REGION)) .credentialsProvider(ProfileCredentialsProvider.create("default")).build()); @@ -69,6 +69,8 @@ public void setup() { kinesisChangeConsumer = new KinesisChangeConsumer(); kinesisChangeConsumer.customClient = mockInstance; + kinesisChangeConsumer.batchSize = 500; + kinesisChangeConsumer.RETRIES = 5; } @AfterEach @@ -77,13 +79,13 @@ public void tearDown() { } @SuppressWarnings({ "rawtypes", "unchecked" }) - private static List> createChangeEvents(int size) { + private static List> createChangeEvents(int size, String destination) { List> changeEvents = new ArrayList<>(); for (int i = 0; i < size; i++) { ChangeEvent result = mock(ChangeEvent.class); when(result.key()).thenReturn("key"); when(result.value()).thenReturn(Integer.toString(i)); - when(result.destination()).thenReturn("dest"); + when(result.destination()).thenReturn(destination); Header header = mock(Header.class); when(header.getKey()).thenReturn("h1Key"); when(header.getValue()).thenReturn("h1Value"); @@ -210,4 +212,40 @@ public void testResendFailedRecords() throws Exception { assertEquals(failedRecordsFromFirstCall.get(i).data(), recordsFromSecondCall.get(i).data()); } } + + @Test + public void testBatchesAreCorrect() throws Exception { + // Arrange + AtomicInteger numRecordsDestinationOne = new AtomicInteger(0); + AtomicInteger numRrecordsDestinationTwo = new AtomicInteger(0); + + doAnswer(invocation -> { + List response = new ArrayList<>(); + PutRecordsRequest request = invocation.getArgument(0); + List records = request.records(); + counter.incrementAndGet(); + + else { + for (PutRecordsRequestEntry record : records) { + recordsFromSecondCall.add(record); + PutRecordsResultEntry recordResult = PutRecordsResultEntry.builder().shardId("shardId").sequenceNumber("sequenceNumber").build(); + response.add(recordResult); + } + return PutRecordsResponse.builder().failedRecordCount(0).records(response).build(); + } + }).when(spyClient).putRecords(any(PutRecordsRequest.class)); + + // Act + try { + kinesisChangeConsumer.connect(); + kinesisChangeConsumer.handleBatch(changeEvents, committer); + } + catch (Exception e) { + threwException.getAndSet(true); + } + + // Assert + assertTrue(threwException.get()); + // DEFAULT_RETRIES is 5 times + assertEquals(5, counter.get()); } From 4e53567ead8b17d2a7255ad94373aa5cde5913df Mon Sep 17 00:00:00 2001 From: Kubha99 <1kunalbhatnagar@gmail.com> Date: Mon, 23 Sep 2024 21:17:41 +0200 Subject: [PATCH 10/17] DBZ-8193 Memory Leak --- .../server/kinesis/KinesisChangeConsumer.java | 119 +++++++++--------- .../server/kinesis/KinesisUnitTest.java | 114 ++++++++++++++--- 2 files changed, 155 insertions(+), 78 deletions(-) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index 5db586c3..ee26ed79 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -126,69 +126,79 @@ public void handleBatch(List> records, RecordCommitt return; } - // Split the records into batches of size 500 String streamName; - List>> batchRecords = createBatches(records, batchSize); + List> batch = new ArrayList<>(); - // Process each batch to PutRecordsRequestEntry - for (List> batch : batchRecords) { - List putRecordsRequestEntryList = new ArrayList<>(); - streamName = batch.get(0).destination(); - for (ChangeEvent record : batch) { + // Group the records by destination + Map>> segmentedBatches = records.stream().collect(Collectors.groupingBy(record -> record.destination())); - Object rv = record.value(); - if (rv == null) { - rv = ""; - } - PutRecordsRequestEntry putRecordsRequestEntry = PutRecordsRequestEntry.builder().partitionKey((record.key() != null) ? getString(record.key()) : nullKey) - .data(SdkBytes.fromByteArray(getBytes(rv))).build(); - putRecordsRequestEntryList.add(putRecordsRequestEntry); - } + // Iterate over the segmentedBatches + for (List> segmentedBatch : segmentedBatches.values()) { + // Iterate over the batch + + for (int i = 0; i < segmentedBatch.size(); i += batchSize) { - // Handle Error - boolean notSuccesful = true; - int attempts = 0; - List batchRequest = putRecordsRequestEntryList; + // Create a sublist of the batch given the batchSize + batch = segmentedBatch.subList(i, Math.min(i + batchSize, segmentedBatch.size())); + List putRecordsRequestEntryList = new ArrayList<>(); + streamName = batch.get(0).destination(); - while (notSuccesful) { + for (ChangeEvent record : batch) { - if (attempts >= RETRIES) { - throw new DebeziumException("Exceeded maximum number of attempts to publish event"); + Object rv = record.value(); + if (rv == null) { + rv = ""; + } + PutRecordsRequestEntry putRecordsRequestEntry = PutRecordsRequestEntry.builder().partitionKey((record.key() != null) ? getString(record.key()) : nullKey) + .data(SdkBytes.fromByteArray(getBytes(rv))).build(); + putRecordsRequestEntryList.add(putRecordsRequestEntry); } - try { - PutRecordsResponse response = recordsSent(batchRequest, streamName); - attempts++; - if (response.failedRecordCount() > 0) { - LOGGER.warn("Failed to send {} number of records, retrying", response.failedRecordCount()); - Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause(); + // Handle Error + boolean notSuccesful = true; + int attempts = 0; + List batchRequest = putRecordsRequestEntryList; + + while (notSuccesful) { - final List putRecordsResults = response.records(); - List failedRecordsList = new ArrayList<>(); + if (attempts >= RETRIES) { + throw new DebeziumException("Exceeded maximum number of attempts to publish event"); + } - for (int index = 0; index < putRecordsResults.size(); index++) { - PutRecordsResultEntry entryResult = putRecordsResults.get(index); - if (entryResult.errorCode() != null) { - failedRecordsList.add(putRecordsRequestEntryList.get(index)); + try { + PutRecordsResponse response = recordsSent(batchRequest, streamName); + attempts++; + if (response.failedRecordCount() > 0) { + LOGGER.warn("Failed to send {} number of records, retrying", response.failedRecordCount()); + Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause(); + + final List putRecordsResults = response.records(); + List failedRecordsList = new ArrayList<>(); + + for (int index = 0; index < putRecordsResults.size(); index++) { + PutRecordsResultEntry entryResult = putRecordsResults.get(index); + if (entryResult.errorCode() != null) { + failedRecordsList.add(putRecordsRequestEntryList.get(index)); + } } + batchRequest = failedRecordsList; + + } + else { + notSuccesful = false; } - batchRequest = failedRecordsList; } - else { - notSuccesful = false; + catch (KinesisException exception) { + LOGGER.warn("Failed to send record to {}", streamName, exception); + attempts++; + Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause(); } - - } - catch (KinesisException exception) { - LOGGER.warn("Failed to send record to {}", streamName, exception); - attempts++; - Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause(); } - } - for (ChangeEvent record : batch) { - committer.markProcessed(record); + for (ChangeEvent record : batch) { + committer.markProcessed(record); + } } } @@ -196,23 +206,6 @@ public void handleBatch(List> records, RecordCommitt committer.markBatchFinished(); } - private List>> createBatches(List> records, final int maxSize) { - - // Segment the records by destination - Map>> segmentedRecords = records.stream().collect(Collectors.groupingBy(record -> record.destination())); - - List>> batchedRecords = new ArrayList<>(); - - // Divide each segment into batches of the specified size - for (List> segment : segmentedRecords.values()) { - for (int i = 0; i < segment.size(); i += batchSize) { - batchedRecords.add(segment.subList(i, Math.min(i + batchSize, segment.size()))); - } - } - - return batchedRecords; - } - private PutRecordsResponse recordsSent(List putRecordsRequestEntryList, String streamName) { // Create a PutRecordsRequest diff --git a/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java b/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java index d80e947a..8a95a019 100644 --- a/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java +++ b/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java @@ -58,7 +58,7 @@ public class KinesisUnitTest { public void setup() { counter = new AtomicInteger(0); threwException = new AtomicBoolean(false); - changeEvents = createChangeEvents(500, "destination"); + changeEvents = createChangeEvents(500, "key", "destination"); committer = RecordCommitter(); spyClient = spy(KinesisClient.builder().region(Region.of(KinesisTestConfigSource.KINESIS_REGION)) .credentialsProvider(ProfileCredentialsProvider.create("default")).build()); @@ -79,16 +79,16 @@ public void tearDown() { } @SuppressWarnings({ "rawtypes", "unchecked" }) - private static List> createChangeEvents(int size, String destination) { + private static List> createChangeEvents(int size, String key, String destination) { List> changeEvents = new ArrayList<>(); for (int i = 0; i < size; i++) { ChangeEvent result = mock(ChangeEvent.class); - when(result.key()).thenReturn("key"); + when(result.key()).thenReturn(key); when(result.value()).thenReturn(Integer.toString(i)); when(result.destination()).thenReturn(destination); Header header = mock(Header.class); - when(header.getKey()).thenReturn("h1Key"); - when(header.getValue()).thenReturn("h1Value"); + when(header.getKey()).thenReturn(key); + when(header.getValue()).thenReturn(Integer.toString(i)); when(result.headers()).thenReturn(List.of(header)); changeEvents.add(result); } @@ -213,26 +213,107 @@ public void testResendFailedRecords() throws Exception { } } + // 4. Create 600 ChangeEvents to destination 1 and 600 to destination 2 and test that they are correctly batched @Test public void testBatchesAreCorrect() throws Exception { // Arrange + List> changeEvents = new ArrayList<>(); + String destinationOne = "dest1"; + String destinationTwo = "dest2"; + + // call createEvents with 600 records for destination 1 and 600 records for destination 2 + changeEvents = createChangeEvents(600, destinationOne, destinationOne); + changeEvents.addAll(createChangeEvents(600, destinationTwo, destinationTwo)); + AtomicInteger numRecordsDestinationOne = new AtomicInteger(0); AtomicInteger numRrecordsDestinationTwo = new AtomicInteger(0); + AtomicInteger numBatches = new AtomicInteger(0); doAnswer(invocation -> { List response = new ArrayList<>(); PutRecordsRequest request = invocation.getArgument(0); List records = request.records(); - counter.incrementAndGet(); + for (PutRecordsRequestEntry record : records) { + if (record.partitionKey().equals(destinationOne)) { + numRecordsDestinationOne.incrementAndGet(); + } + else if (record.partitionKey().equals(destinationTwo)) { + numRrecordsDestinationTwo.incrementAndGet(); + } + PutRecordsResultEntry recordResult = PutRecordsResultEntry.builder().shardId("shardId").sequenceNumber("sequenceNumber").build(); + response.add(recordResult); + } + numBatches.incrementAndGet(); + return PutRecordsResponse.builder().failedRecordCount(0).records(response).build(); + }).when(spyClient).putRecords(any(PutRecordsRequest.class)); - else { - for (PutRecordsRequestEntry record : records) { - recordsFromSecondCall.add(record); - PutRecordsResultEntry recordResult = PutRecordsResultEntry.builder().shardId("shardId").sequenceNumber("sequenceNumber").build(); - response.add(recordResult); + // Act + try { + kinesisChangeConsumer.connect(); + kinesisChangeConsumer.handleBatch(changeEvents, committer); + } + catch (Exception e) { + threwException.getAndSet(true); + } + + // Assert + // No exception should be thrown + assertFalse(threwException.get()); + // 2 destinations, 600 records each + assertEquals(600, numRecordsDestinationOne.get()); + assertEquals(600, numRrecordsDestinationTwo.get()); + // 2 destinations, 2 batches each + assertEquals(4, numBatches.get()); + } + + // 5. Test that empty records are handled correctly + @Test + public void testEmptyRecords() throws Exception { + // Arrange + List> changeEvents = new ArrayList<>(); + + // Act + try { + kinesisChangeConsumer.connect(); + kinesisChangeConsumer.handleBatch(changeEvents, committer); + } + catch (Exception e) { + threwException.getAndSet(true); + } + + // Assert + assertFalse(threwException.get()); + } + + // 6. Test that a batch of 1000 records is correctly split into 2 batches of 500 records + @Test + public void testBatchSplitting() throws Exception { + // Arrange + List> changeEvents = createChangeEvents(1000, "key", "destination"); + + AtomicInteger numBatches = new AtomicInteger(0); + AtomicInteger numRecordsBatchOne = new AtomicInteger(0); + AtomicInteger numRecordsBatchTwo = new AtomicInteger(0); + AtomicBoolean firstBatch = new AtomicBoolean(true); + + doAnswer(invocation -> { + List response = new ArrayList<>(); + PutRecordsRequest request = invocation.getArgument(0); + List records = request.records(); + + for (PutRecordsRequestEntry record : records) { + if (firstBatch.get()) { + numRecordsBatchOne.incrementAndGet(); } - return PutRecordsResponse.builder().failedRecordCount(0).records(response).build(); + else { + numRecordsBatchTwo.incrementAndGet(); + } + PutRecordsResultEntry recordResult = PutRecordsResultEntry.builder().shardId("shardId").sequenceNumber("sequenceNumber").build(); + response.add(recordResult); } + numBatches.incrementAndGet(); + firstBatch.getAndSet(false); + return PutRecordsResponse.builder().failedRecordCount(0).records(response).build(); }).when(spyClient).putRecords(any(PutRecordsRequest.class)); // Act @@ -245,7 +326,10 @@ public void testBatchesAreCorrect() throws Exception { } // Assert - assertTrue(threwException.get()); - // DEFAULT_RETRIES is 5 times - assertEquals(5, counter.get()); + assertFalse(threwException.get()); + assertEquals(2, numBatches.get()); + assertEquals(500, numRecordsBatchOne.get()); + assertEquals(500, numRecordsBatchTwo.get()); + } + } From 8e8fe3b123ac25df875d73e54202bde3ac195962 Mon Sep 17 00:00:00 2001 From: Kubha99 <1kunalbhatnagar@gmail.com> Date: Mon, 23 Sep 2024 21:29:48 +0200 Subject: [PATCH 11/17] DBZ-8193 Ran Checkstyle --- .../io/debezium/server/kinesis/KinesisChangeConsumer.java | 5 +++-- .../java/io/debezium/server/kinesis/KinesisUnitTest.java | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index ee26ed79..a92ed341 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -135,7 +135,7 @@ public void handleBatch(List> records, RecordCommitt // Iterate over the segmentedBatches for (List> segmentedBatch : segmentedBatches.values()) { // Iterate over the batch - + for (int i = 0; i < segmentedBatch.size(); i += batchSize) { // Create a sublist of the batch given the batchSize @@ -149,7 +149,8 @@ public void handleBatch(List> records, RecordCommitt if (rv == null) { rv = ""; } - PutRecordsRequestEntry putRecordsRequestEntry = PutRecordsRequestEntry.builder().partitionKey((record.key() != null) ? getString(record.key()) : nullKey) + PutRecordsRequestEntry putRecordsRequestEntry = PutRecordsRequestEntry.builder() + .partitionKey((record.key() != null) ? getString(record.key()) : nullKey) .data(SdkBytes.fromByteArray(getBytes(rv))).build(); putRecordsRequestEntryList.add(putRecordsRequestEntry); } diff --git a/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java b/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java index 8a95a019..97162052 100644 --- a/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java +++ b/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java @@ -331,5 +331,5 @@ public void testBatchSplitting() throws Exception { assertEquals(500, numRecordsBatchOne.get()); assertEquals(500, numRecordsBatchTwo.get()); } - + } From eb7672bd243df9f6e79e15a130e3adb6ce6ea6fa Mon Sep 17 00:00:00 2001 From: Kunal Bhatnagar <1kunalbhatnagar@gmail.com> Date: Tue, 24 Sep 2024 14:21:32 +0200 Subject: [PATCH 12/17] DBZ-8193 make varaibles configurable --- .../server/kinesis/KinesisChangeConsumer.java | 13 ++++++------- .../io/debezium/server/kinesis/KinesisUnitTest.java | 2 -- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index a92ed341..32a4afe6 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -69,16 +69,12 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu private Optional endpointOverride; private Optional credentialsProfile; private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1); + private Integer batchSize; + private Integer RETRIES; @ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") String nullKey; - @ConfigProperty(name = PROP_BATCH_SIZE, defaultValue = "500") - int batchSize; - - @ConfigProperty(name = PROP_RETRIES, defaultValue = "5") - int RETRIES; - private KinesisClient client = null; @Inject @@ -87,13 +83,16 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu @PostConstruct void connect() { + final Config config = ConfigProvider.getConfig(); + batchSize = config.getOptionalValue(PROP_BATCH_SIZE, Integer.class).orElse(500); + RETRIES = config.getOptionalValue(PROP_RETRIES, Integer.class).orElse(5); + if (customClient.isResolvable()) { client = customClient.get(); LOGGER.info("Obtained custom configured KinesisClient '{}'", client); return; } - final Config config = ConfigProvider.getConfig(); region = config.getValue(PROP_REGION_NAME, String.class); endpointOverride = config.getOptionalValue(PROP_ENDPOINT_NAME, String.class); credentialsProfile = config.getOptionalValue(PROP_CREDENTIALS_PROFILE, String.class); diff --git a/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java b/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java index 97162052..573890f3 100644 --- a/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java +++ b/debezium-server-kinesis/src/test/java/io/debezium/server/kinesis/KinesisUnitTest.java @@ -69,8 +69,6 @@ public void setup() { kinesisChangeConsumer = new KinesisChangeConsumer(); kinesisChangeConsumer.customClient = mockInstance; - kinesisChangeConsumer.batchSize = 500; - kinesisChangeConsumer.RETRIES = 5; } @AfterEach From dac5782f86c94b35da500e882b4a9e9e161d8258 Mon Sep 17 00:00:00 2001 From: Kubha99 <1kunalbhatnagar@gmail.com> Date: Tue, 24 Sep 2024 14:32:44 +0200 Subject: [PATCH 13/17] DBZ-8193 Guard for batch size --- .../io/debezium/server/kinesis/KinesisChangeConsumer.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index 32a4afe6..79058ecc 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -87,6 +87,12 @@ void connect() { batchSize = config.getOptionalValue(PROP_BATCH_SIZE, Integer.class).orElse(500); RETRIES = config.getOptionalValue(PROP_RETRIES, Integer.class).orElse(5); + if (batchSize <= 0) { + throw new DebeziumException("Batch size must be greater than 0"); + } else if (batchSize > 500) { + throw new DebeziumException("Retries must be less than or equal to 500"); + } + if (customClient.isResolvable()) { client = customClient.get(); LOGGER.info("Obtained custom configured KinesisClient '{}'", client); From c5e4ee8589d2ea62344bddd2b308c6b8b88295a5 Mon Sep 17 00:00:00 2001 From: Kunal Bhatnagar <1kunalbhatnagar@gmail.com> Date: Wed, 25 Sep 2024 09:24:29 +0200 Subject: [PATCH 14/17] DBZ-8193 Ran Checkstyle --- .../java/io/debezium/server/kinesis/KinesisChangeConsumer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index 79058ecc..5f146e65 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -89,7 +89,8 @@ void connect() { if (batchSize <= 0) { throw new DebeziumException("Batch size must be greater than 0"); - } else if (batchSize > 500) { + } + else if (batchSize > 500) { throw new DebeziumException("Retries must be less than or equal to 500"); } From 0eb05fbfe6b46abc9357d56a4933286c96b14924 Mon Sep 17 00:00:00 2001 From: Kunal Bhatnagar Date: Thu, 3 Oct 2024 09:48:00 +0200 Subject: [PATCH 15/17] DBZ - 8193 Added changes from comments --- .../server/kinesis/KinesisChangeConsumer.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index 5f146e65..a07ee43a 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -70,7 +70,8 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu private Optional credentialsProfile; private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1); private Integer batchSize; - private Integer RETRIES; + private Integer maxRetries; + private Integer MAX_BATCH_SIZE = 500; @ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") String nullKey; @@ -84,14 +85,14 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu @PostConstruct void connect() { final Config config = ConfigProvider.getConfig(); - batchSize = config.getOptionalValue(PROP_BATCH_SIZE, Integer.class).orElse(500); - RETRIES = config.getOptionalValue(PROP_RETRIES, Integer.class).orElse(5); + batchSize = config.getOptionalValue(PROP_BATCH_SIZE, Integer.class).orElse(MAX_BATCH_SIZE); + maxRetries = config.getOptionalValue(PROP_RETRIES, Integer.class).orElse(5); if (batchSize <= 0) { throw new DebeziumException("Batch size must be greater than 0"); } - else if (batchSize > 500) { - throw new DebeziumException("Retries must be less than or equal to 500"); + else if (batchSize > MAX_BATCH_SIZE) { + throw new DebeziumException("Batch size must be less than or equal to MAX_BATCH_SIZE"); } if (customClient.isResolvable()) { @@ -168,7 +169,7 @@ public void handleBatch(List> records, RecordCommitt while (notSuccesful) { - if (attempts >= RETRIES) { + if (attempts >= maxRetries) { throw new DebeziumException("Exceeded maximum number of attempts to publish event"); } From 6cb0a0359fa9fca2a02020b64c58f56478fe8e5e Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Fri, 4 Oct 2024 09:43:48 +0200 Subject: [PATCH 16/17] DBZ-8193 Re-organize constants --- .../io/debezium/server/kinesis/KinesisChangeConsumer.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index a07ee43a..f03e29f7 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -65,13 +65,15 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu private static final String PROP_BATCH_SIZE = PROP_PREFIX + "batch.size"; private static final String PROP_RETRIES = PROP_PREFIX + "default.retries"; + private static final int DEFAULT_RETRY_COUNT = 5; + private static final int MAX_BATCH_SIZE = 500; + private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1); + private String region; private Optional endpointOverride; private Optional credentialsProfile; - private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1); private Integer batchSize; private Integer maxRetries; - private Integer MAX_BATCH_SIZE = 500; @ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") String nullKey; @@ -86,7 +88,7 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu void connect() { final Config config = ConfigProvider.getConfig(); batchSize = config.getOptionalValue(PROP_BATCH_SIZE, Integer.class).orElse(MAX_BATCH_SIZE); - maxRetries = config.getOptionalValue(PROP_RETRIES, Integer.class).orElse(5); + maxRetries = config.getOptionalValue(PROP_RETRIES, Integer.class).orElse(DEFAULT_RETRY_COUNT); if (batchSize <= 0) { throw new DebeziumException("Batch size must be greater than 0"); From 19e303d3340e2f0140030e9cc76953624dfbbc9e Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Fri, 4 Oct 2024 09:44:32 +0200 Subject: [PATCH 17/17] DBZ-8193 Reset retries on success --- .../java/io/debezium/server/kinesis/KinesisChangeConsumer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index f03e29f7..c8132008 100644 --- a/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -196,6 +196,7 @@ public void handleBatch(List> records, RecordCommitt } else { notSuccesful = false; + attempts = 0; } }