Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8193 Batch write to AWS Kinesis #121

Closed
wants to merge 21 commits into from
Closed

Conversation

Kubha99
Copy link

@Kubha99 Kubha99 commented Sep 11, 2024

Added batch write to AWS Kinesis by switching to PutRecords API.

@Kubha99 Kubha99 changed the title DBZ-8193 DBZ-8193 Batch write to AWS Kinesis Sep 11, 2024
Copy link
Contributor

@jpechane jpechane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kubha99 Thanks for the PR! It is definitely good start but needs additional polishing. Please take a look at the comments.
Also wrt the build failure, in this case it was in Redis - it is unfortunately a transient environment error that we were not able to fix yet so it does not relate to the PR.

@@ -61,6 +66,7 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu
private Optional<String> endpointOverride;
private Optional<String> credentialsProfile;
private static final int DEFAULT_RETRIES = 5;
private static final int batchSize = 500;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PLease make the batch size configurable

}

// Handle Error
boolean notSuccesfull = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
boolean notSuccesfull = true;
boolean notSuccesful = true;

@@ -61,6 +66,7 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu
private Optional<String> endpointOverride;
private Optional<String> credentialsProfile;
private static final int DEFAULT_RETRIES = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the retry coun configurable


// Split the records into batches of size 500
String streamName = records.get(0).destination();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add guard just in case the records is empty list. Should not happen but better to apply it as defensive policy.
Also what would happen if there will be records for multiple tables/destinations in the list?

PutRecordsResponse response = recordsSent(batchRequest, streamName);
attempts++;
if (response.failedRecordCount() > 0) {
Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a WARN message with response details

LOGGER.trace("Response Receieved: " + putRecordsResponse);
return putRecordsResponse;
}

private boolean recordSent(ChangeEvent<Object, Object> record) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the unsused method

@jpechane
Copy link
Contributor

The test has failed with heap space. Please double check if there are any memory leaks. I'll trigger the test re-execution.

Copy link
Contributor

@jpechane jpechane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I left few more suggestion related to the code structure and one question related to mark processing.

return;
}

// Split the records into batches of size 500
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Split the records into batches of size 500
// Split the records into batches

private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1);
private Integer batchSize;
private Integer RETRIES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private Integer RETRIES;
private Integer maxRetries;

throw new DebeziumException("Batch size must be greater than 0");
}
else if (batchSize > 500) {
throw new DebeziumException("Retries must be less than or equal to 500");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new DebeziumException("Retries must be less than or equal to 500");
throw new DebeziumException("Batch size must be less than or equal to 500");

@@ -74,13 +83,23 @@ public class KinesisChangeConsumer extends BaseChangeConsumer implements Debeziu

@PostConstruct
void connect() {
final Config config = ConfigProvider.getConfig();
batchSize = config.getOptionalValue(PROP_BATCH_SIZE, Integer.class).orElse(500);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
batchSize = config.getOptionalValue(PROP_BATCH_SIZE, Integer.class).orElse(500);
batchSize = config.getOptionalValue(PROP_BATCH_SIZE, Integer.class).orElse(MAX_BATCH_SIZE);

if (batchSize <= 0) {
throw new DebeziumException("Batch size must be greater than 0");
}
else if (batchSize > 500) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
else if (batchSize > 500) {
else if (batchSize > MAX_BATCH_SIZE) {

throw new DebeziumException("Batch size must be greater than 0");
}
else if (batchSize > 500) {
throw new DebeziumException("Retries must be less than or equal to 500");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new DebeziumException("Retries must be less than or equal to 500");
throw new DebeziumException("Retries must be less than or equal to " + MAX_BATCH_SIZE);

}
}

for (ChangeEvent<Object, Object> record : batch) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the batches are grouped by topic they will not be sent in total order - this is expected. Do you want also mark records as processed per-batch even if it does not conform to the total order or would you prefer to move it out of the batch loop and mark them at the end in a single loop over original records list?

@Kubha99 Kubha99 requested a review from jpechane October 3, 2024 12:05
@jpechane
Copy link
Contributor

jpechane commented Oct 4, 2024

Closing, merging via #129

@jpechane jpechane closed this Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants