-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-8193 Batch write to AWS Kinesis #121
Changes from 17 commits
ce1154b
3035ce1
769fafe
65c8204
36fe2b2
3117cb5
907a590
0e8260b
79424b2
4e53567
8e8fe3b
eb7672b
dac5782
2c40c49
994629f
c5e4ee8
b7a970b
0eb05fb
6cb0a03
19e303d
6499271
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -7,8 +7,11 @@ | |||||||||
|
||||||||||
import java.net.URI; | ||||||||||
import java.time.Duration; | ||||||||||
import java.util.ArrayList; | ||||||||||
import java.util.List; | ||||||||||
import java.util.Map; | ||||||||||
import java.util.Optional; | ||||||||||
import java.util.stream.Collectors; | ||||||||||
|
||||||||||
import jakarta.annotation.PostConstruct; | ||||||||||
import jakarta.annotation.PreDestroy; | ||||||||||
|
@@ -38,7 +41,10 @@ | |||||||||
import software.amazon.awssdk.services.kinesis.KinesisClient; | ||||||||||
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; | ||||||||||
import software.amazon.awssdk.services.kinesis.model.KinesisException; | ||||||||||
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; | ||||||||||
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest; | ||||||||||
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry; | ||||||||||
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse; | ||||||||||
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry; | ||||||||||
|
||||||||||
/** | ||||||||||
* Implementation of the consumer that delivers the messages into Amazon Kinesis destination. | ||||||||||
|
@@ -56,12 +62,15 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu | |||||||||
private static final String PROP_REGION_NAME = PROP_PREFIX + "region"; | ||||||||||
private static final String PROP_ENDPOINT_NAME = PROP_PREFIX + "endpoint"; | ||||||||||
private static final String PROP_CREDENTIALS_PROFILE = PROP_PREFIX + "credentials.profile"; | ||||||||||
private static final String PROP_BATCH_SIZE = PROP_PREFIX + "batch.size"; | ||||||||||
private static final String PROP_RETRIES = PROP_PREFIX + "default.retries"; | ||||||||||
|
||||||||||
private String region; | ||||||||||
private Optional<String> endpointOverride; | ||||||||||
private Optional<String> credentialsProfile; | ||||||||||
private static final int DEFAULT_RETRIES = 5; | ||||||||||
private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1); | ||||||||||
private Integer batchSize; | ||||||||||
private Integer RETRIES; | ||||||||||
|
||||||||||
@ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") | ||||||||||
String nullKey; | ||||||||||
|
@@ -74,13 +83,23 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu | |||||||||
|
||||||||||
@PostConstruct | ||||||||||
void connect() { | ||||||||||
final Config config = ConfigProvider.getConfig(); | ||||||||||
batchSize = config.getOptionalValue(PROP_BATCH_SIZE, Integer.class).orElse(500); | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
RETRIES = config.getOptionalValue(PROP_RETRIES, Integer.class).orElse(5); | ||||||||||
|
||||||||||
if (batchSize <= 0) { | ||||||||||
throw new DebeziumException("Batch size must be greater than 0"); | ||||||||||
} | ||||||||||
else if (batchSize > 500) { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
throw new DebeziumException("Retries must be less than or equal to 500"); | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
} | ||||||||||
|
||||||||||
if (customClient.isResolvable()) { | ||||||||||
client = customClient.get(); | ||||||||||
LOGGER.info("Obtained custom configured KinesisClient '{}'", client); | ||||||||||
return; | ||||||||||
} | ||||||||||
|
||||||||||
final Config config = ConfigProvider.getConfig(); | ||||||||||
region = config.getValue(PROP_REGION_NAME, String.class); | ||||||||||
endpointOverride = config.getOptionalValue(PROP_ENDPOINT_NAME, String.class); | ||||||||||
credentialsProfile = config.getOptionalValue(PROP_CREDENTIALS_PROFILE, String.class); | ||||||||||
|
@@ -106,41 +125,102 @@ void close() { | |||||||||
@Override | ||||||||||
public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitter<ChangeEvent<Object, Object>> committer) | ||||||||||
throws InterruptedException { | ||||||||||
for (ChangeEvent<Object, Object> record : records) { | ||||||||||
LOGGER.trace("Received event '{}'", record); | ||||||||||
|
||||||||||
int attempts = 0; | ||||||||||
while (!recordSent(record)) { | ||||||||||
attempts++; | ||||||||||
if (attempts >= DEFAULT_RETRIES) { | ||||||||||
throw new DebeziumException("Exceeded maximum number of attempts to publish event " + record); | ||||||||||
|
||||||||||
// Guard if records are empty | ||||||||||
if (records.isEmpty()) { | ||||||||||
committer.markBatchFinished(); | ||||||||||
return; | ||||||||||
} | ||||||||||
|
||||||||||
String streamName; | ||||||||||
List<ChangeEvent<Object, Object>> batch = new ArrayList<>(); | ||||||||||
|
||||||||||
// Group the records by destination | ||||||||||
Map<String, List<ChangeEvent<Object, Object>>> segmentedBatches = records.stream().collect(Collectors.groupingBy(record -> record.destination())); | ||||||||||
|
||||||||||
// Iterate over the segmentedBatches | ||||||||||
for (List<ChangeEvent<Object, Object>> segmentedBatch : segmentedBatches.values()) { | ||||||||||
// Iterate over the batch | ||||||||||
|
||||||||||
for (int i = 0; i < segmentedBatch.size(); i += batchSize) { | ||||||||||
|
||||||||||
// Create a sublist of the batch given the batchSize | ||||||||||
batch = segmentedBatch.subList(i, Math.min(i + batchSize, segmentedBatch.size())); | ||||||||||
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); | ||||||||||
streamName = batch.get(0).destination(); | ||||||||||
|
||||||||||
for (ChangeEvent<Object, Object> record : batch) { | ||||||||||
|
||||||||||
Object rv = record.value(); | ||||||||||
if (rv == null) { | ||||||||||
rv = ""; | ||||||||||
} | ||||||||||
PutRecordsRequestEntry putRecordsRequestEntry = PutRecordsRequestEntry.builder() | ||||||||||
.partitionKey((record.key() != null) ? getString(record.key()) : nullKey) | ||||||||||
.data(SdkBytes.fromByteArray(getBytes(rv))).build(); | ||||||||||
putRecordsRequestEntryList.add(putRecordsRequestEntry); | ||||||||||
} | ||||||||||
|
||||||||||
// Handle Error | ||||||||||
boolean notSuccesful = true; | ||||||||||
int attempts = 0; | ||||||||||
List<PutRecordsRequestEntry> batchRequest = putRecordsRequestEntryList; | ||||||||||
|
||||||||||
while (notSuccesful) { | ||||||||||
|
||||||||||
if (attempts >= RETRIES) { | ||||||||||
throw new DebeziumException("Exceeded maximum number of attempts to publish event"); | ||||||||||
} | ||||||||||
|
||||||||||
try { | ||||||||||
PutRecordsResponse response = recordsSent(batchRequest, streamName); | ||||||||||
attempts++; | ||||||||||
if (response.failedRecordCount() > 0) { | ||||||||||
LOGGER.warn("Failed to send {} number of records, retrying", response.failedRecordCount()); | ||||||||||
Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause(); | ||||||||||
|
||||||||||
final List<PutRecordsResultEntry> putRecordsResults = response.records(); | ||||||||||
List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); | ||||||||||
|
||||||||||
for (int index = 0; index < putRecordsResults.size(); index++) { | ||||||||||
PutRecordsResultEntry entryResult = putRecordsResults.get(index); | ||||||||||
if (entryResult.errorCode() != null) { | ||||||||||
failedRecordsList.add(putRecordsRequestEntryList.get(index)); | ||||||||||
} | ||||||||||
} | ||||||||||
batchRequest = failedRecordsList; | ||||||||||
|
||||||||||
} | ||||||||||
else { | ||||||||||
notSuccesful = false; | ||||||||||
} | ||||||||||
|
||||||||||
} | ||||||||||
catch (KinesisException exception) { | ||||||||||
LOGGER.warn("Failed to send record to {}", streamName, exception); | ||||||||||
attempts++; | ||||||||||
Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause(); | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
for (ChangeEvent<Object, Object> record : batch) { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As the batches are grouped by topic they will not be sent in total order - this is expected. Do you want also mark records as processed per-batch even if it does not conform to the total order or would you prefer to move it out of the batch loop and mark them at the end in a single loop over original |
||||||||||
committer.markProcessed(record); | ||||||||||
} | ||||||||||
Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause(); | ||||||||||
} | ||||||||||
committer.markProcessed(record); | ||||||||||
} | ||||||||||
|
||||||||||
// Mark Batch Finished | ||||||||||
committer.markBatchFinished(); | ||||||||||
} | ||||||||||
|
||||||||||
private boolean recordSent(ChangeEvent<Object, Object> record) { | ||||||||||
Object rv = record.value(); | ||||||||||
if (rv == null) { | ||||||||||
rv = ""; | ||||||||||
} | ||||||||||
private PutRecordsResponse recordsSent(List<PutRecordsRequestEntry> putRecordsRequestEntryList, String streamName) { | ||||||||||
|
||||||||||
final PutRecordRequest putRecord = PutRecordRequest.builder() | ||||||||||
.partitionKey((record.key() != null) ? getString(record.key()) : nullKey) | ||||||||||
.streamName(streamNameMapper.map(record.destination())) | ||||||||||
.data(SdkBytes.fromByteArray(getBytes(rv))) | ||||||||||
.build(); | ||||||||||
// Create a PutRecordsRequest | ||||||||||
PutRecordsRequest putRecordsRequest = PutRecordsRequest.builder().streamName(streamNameMapper.map(streamName)).records(putRecordsRequestEntryList).build(); | ||||||||||
|
||||||||||
try { | ||||||||||
client.putRecord(putRecord); | ||||||||||
return true; | ||||||||||
} | ||||||||||
catch (KinesisException exception) { | ||||||||||
LOGGER.warn("Failed to send record to {}", record.destination(), exception); | ||||||||||
return false; | ||||||||||
} | ||||||||||
// Send Request | ||||||||||
PutRecordsResponse putRecordsResponse = client.putRecords(putRecordsRequest); | ||||||||||
LOGGER.trace("Response Receieved: " + putRecordsResponse); | ||||||||||
return putRecordsResponse; | ||||||||||
} | ||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.