From 28335f32b3a2fff90b734d8b024ce150055747dc Mon Sep 17 00:00:00 2001 From: Emil Koutanov Date: Mon, 9 Mar 2020 17:39:36 +1100 Subject: [PATCH] Copes with transaction markers. Cloes #64 --- src/main/java/kafdrop/service/KafkaHighLevelConsumer.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index 3a97e011..875c02ab 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -42,6 +42,7 @@ private void initializeClient() { properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafdrop-consumer"); + properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); kafkaConfiguration.applyCommon(properties); kafkaConsumer = new KafkaConsumer<>(properties); @@ -92,6 +93,7 @@ synchronized List> getLatestRecords(TopicPartitio MessageDeserializer deserializer) { initializeClient(); final var partitions = Collections.singletonList(partition); + final var maxEmptyPolls = 3; // caps the number of empty polls kafkaConsumer.assign(partitions); kafkaConsumer.seek(partition, offset); @@ -102,12 +104,16 @@ synchronized List> getLatestRecords(TopicPartitio var currentOffset = offset - 1; // stop if got to count or get to the latest offset + var emptyPolls = 0; while (rawRecords.size() < count && currentOffset < latestOffset) { final var polled = kafkaConsumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)).records(partition); - if (!polled.isEmpty()) { + if (! polled.isEmpty()) { rawRecords.addAll(polled); currentOffset = polled.get(polled.size() - 1).offset(); + emptyPolls = 0; + } else if (++emptyPolls == maxEmptyPolls) { + break; } }