Skip to content

Commit

Permalink
Copes with transaction markers. Cloes #64
Browse files Browse the repository at this point in the history
  • Loading branch information
ekoutanov committed Mar 9, 2020
1 parent fd737b2 commit 28335f3
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion src/main/java/kafdrop/service/KafkaHighLevelConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ private void initializeClient() {
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafdrop-consumer");
properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
kafkaConfiguration.applyCommon(properties);

kafkaConsumer = new KafkaConsumer<>(properties);
Expand Down Expand Up @@ -92,6 +93,7 @@ synchronized List<ConsumerRecord<String, String>> getLatestRecords(TopicPartitio
MessageDeserializer deserializer) {
initializeClient();
final var partitions = Collections.singletonList(partition);
final var maxEmptyPolls = 3; // caps the number of empty polls
kafkaConsumer.assign(partitions);
kafkaConsumer.seek(partition, offset);

Expand All @@ -102,12 +104,16 @@ synchronized List<ConsumerRecord<String, String>> getLatestRecords(TopicPartitio
var currentOffset = offset - 1;

// stop if got to count or get to the latest offset
var emptyPolls = 0;
while (rawRecords.size() < count && currentOffset < latestOffset) {
final var polled = kafkaConsumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)).records(partition);

if (!polled.isEmpty()) {
if (! polled.isEmpty()) {
rawRecords.addAll(polled);
currentOffset = polled.get(polled.size() - 1).offset();
emptyPolls = 0;
} else if (++emptyPolls == maxEmptyPolls) {
break;
}
}

Expand Down

0 comments on commit 28335f3

Please sign in to comment.