Skip to content

Commit

Permalink
Only allow single thread to check emptiness
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Oct 25, 2023
1 parent 5236e1d commit e402412
Showing 1 changed file with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand All @@ -68,6 +69,8 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener
private static final Logger LOG = LoggerFactory.getLogger(KafkaCustomConsumer.class);
private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L;
private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1;
private static final ConcurrentHashMap<TopicPartition, Boolean> TOPIC_PARTITION_TO_IS_EMPTY = new ConcurrentHashMap<>();
private static Long topicEmptyCheckingOwnerThreadId = null;
static final String DEFAULT_KEY = "message";

private volatile long lastCommitTime;
Expand Down Expand Up @@ -531,7 +534,16 @@ final String getTopicPartitionOffset(final Map<TopicPartition, Long> offsetMap,
return Objects.isNull(offset) ? "-" : offset.toString();
}

public boolean isTopicEmpty() {
public synchronized boolean isTopicEmpty() {
final long currentThreadId = Thread.currentThread().getId();
if (Objects.isNull(topicEmptyCheckingOwnerThreadId)) {
topicEmptyCheckingOwnerThreadId = currentThreadId;
}

if (currentThreadId != topicEmptyCheckingOwnerThreadId) {
return TOPIC_PARTITION_TO_IS_EMPTY.values().stream().allMatch(isEmpty -> isEmpty);
}

final List<PartitionInfo> partitions = consumer.partitionsFor(topicName);
final List<TopicPartition> topicPartitions = partitions.stream()
.map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition()))
Expand All @@ -548,16 +560,18 @@ public boolean isTopicEmpty() {
if (endOffset != 0L) {
// If there is no committed offset for the partition
if (Objects.isNull(offsetAndMetadata)) {
return false;
TOPIC_PARTITION_TO_IS_EMPTY.put(topicPartition, false);
}

// If the committed offset is behind the end offset
if (offsetAndMetadata.offset() < endOffset) {
return false;
TOPIC_PARTITION_TO_IS_EMPTY.put(topicPartition, false);
}
}

TOPIC_PARTITION_TO_IS_EMPTY.put(topicPartition, true);
}

return true;
return TOPIC_PARTITION_TO_IS_EMPTY.values().stream().allMatch(isEmpty -> isEmpty);
}
}

0 comments on commit e402412

Please sign in to comment.