Skip to content

Commit

Permalink
Fix logic and add 1 minute wait before requerying kafka
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Oct 25, 2023
1 parent e402412 commit f705ea0
Showing 1 changed file with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener

private static final Logger LOG = LoggerFactory.getLogger(KafkaCustomConsumer.class);
private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L;
private static final long IS_EMPTY_CHECK_INTERVAL_MS = 60000L;
private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1;
private static final ConcurrentHashMap<TopicPartition, Boolean> TOPIC_PARTITION_TO_IS_EMPTY = new ConcurrentHashMap<>();
private static Long topicEmptyCheckingOwnerThreadId = null;
private static long lastIsEmptyCheckTime = 0;
static final String DEFAULT_KEY = "message";

private volatile long lastCommitTime;
Expand Down Expand Up @@ -540,7 +542,7 @@ public synchronized boolean isTopicEmpty() {
topicEmptyCheckingOwnerThreadId = currentThreadId;
}

if (currentThreadId != topicEmptyCheckingOwnerThreadId) {
if (currentThreadId != topicEmptyCheckingOwnerThreadId || System.currentTimeMillis() < lastIsEmptyCheckTime + IS_EMPTY_CHECK_INTERVAL_MS) {
return TOPIC_PARTITION_TO_IS_EMPTY.values().stream().allMatch(isEmpty -> isEmpty);
}

Expand All @@ -555,23 +557,20 @@ public synchronized boolean isTopicEmpty() {
for (TopicPartition topicPartition : topicPartitions) {
final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition);
final Long endOffset = endOffsets.get(topicPartition);
LOG.info("Partition {} offsets: endOffset: {}, committedOffset: {}",
topicPartition, endOffset, Objects.isNull(offsetAndMetadata) ? "-" : offsetAndMetadata.offset());
TOPIC_PARTITION_TO_IS_EMPTY.put(topicPartition, true);

// If there is data in the partition
if (endOffset != 0L) {
// If there is no committed offset for the partition
if (Objects.isNull(offsetAndMetadata)) {
TOPIC_PARTITION_TO_IS_EMPTY.put(topicPartition, false);
}

// If the committed offset is behind the end offset
if (offsetAndMetadata.offset() < endOffset) {
// If there is no committed offset for the partition or the committed offset is behind the end offset
if (Objects.isNull(offsetAndMetadata) || offsetAndMetadata.offset() < endOffset) {
TOPIC_PARTITION_TO_IS_EMPTY.put(topicPartition, false);
}
}

TOPIC_PARTITION_TO_IS_EMPTY.put(topicPartition, true);
}

lastIsEmptyCheckTime = System.currentTimeMillis();
return TOPIC_PARTITION_TO_IS_EMPTY.values().stream().allMatch(isEmpty -> isEmpty);
}
}

0 comments on commit f705ea0

Please sign in to comment.