Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use immutable objects #199

Merged
merged 4 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,26 @@

import io.statnett.k3a.lagexporter.model.ClusterData;
import io.statnett.k3a.lagexporter.model.ConsumerGroupData;
import io.statnett.k3a.lagexporter.model.ConsumerGroupOffset;
import io.statnett.k3a.lagexporter.model.TopicPartitionData;
import io.statnett.k3a.lagexporter.utils.RegexStringListFilter;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.groupingBy;

public final class ClusterLagCollector {

private static final Logger LOG = LoggerFactory.getLogger(ClusterLagCollector.class);
Expand All @@ -33,23 +41,25 @@ public ClusterLagCollector(final String clusterName,
}

public ClusterData collectClusterData() {
final ClusterData clusterData = new ClusterData(clusterName);
final Set<TopicPartition> topicPartitions = new HashSet<>();
final boolean clientConnected = client.isConnected();
final long startMs = System.currentTimeMillis();
final Set<String> allConsumerGroupIds = client.consumerGroupIds(consumerGroupFilter);
findConsumerGroupOffsets(allConsumerGroupIds, clusterData, topicPartitions);
findReplicaCounts(clusterData, topicPartitions);
findEndOffsetsAndUpdateLag(topicPartitions, clusterData);
final long pollTimeMs = System.currentTimeMillis() - startMs;
final Map<TopicPartition, List<ConsumerGroupOffset>> groupOffsets = findConsumerGroupOffsets(allConsumerGroupIds);
final Map<TopicPartition, TopicPartitionData> topicPartitionData = findTopicPartitionData(groupOffsets.keySet());
final Map<TopicPartitionData, List<ConsumerGroupData>> topicAndConsumerData = calculateLag(groupOffsets, topicPartitionData);
final long pollTimeMs;
if (clientConnected) {
clusterData.setPollTimeMs(pollTimeMs);
pollTimeMs = System.currentTimeMillis() - startMs;
} else {
pollTimeMs = -1;
}
final ClusterData clusterData = new ClusterData(clusterName, topicAndConsumerData, pollTimeMs);
LOG.info("Polled lag data for {} in {} ms", clusterName, pollTimeMs);
return clusterData;
}

private void findConsumerGroupOffsets(final Set<String> consumerGroupIds, final ClusterData clusterData, final Set<TopicPartition> topicPartitions) {
private Map<TopicPartition, List<ConsumerGroupOffset>> findConsumerGroupOffsets(final Set<String> consumerGroupIds) {
Set<ConsumerGroupOffset> consumerGroupOffsetSet = new HashSet<>();
client.consumerGroupOffsets(consumerGroupIds)
.forEach((consumerGroup, offsets) -> offsets.forEach((partition, offsetAndMetadata) -> {
final String topicName = partition.topic();
Expand All @@ -60,61 +70,66 @@ private void findConsumerGroupOffsets(final Set<String> consumerGroupIds, final
LOG.info("No offset data for partition {}", partition);
return;
}
final TopicPartitionData topicPartitionData = clusterData.findTopicPartitionData(partition);
final ConsumerGroupData consumerGroupData = topicPartitionData.findConsumerGroupData(consumerGroup);
consumerGroupData.setOffset(offsetAndMetadata.offset());
topicPartitions.add(partition);
consumerGroupOffsetSet.add(new ConsumerGroupOffset(partition, consumerGroup, offsetAndMetadata.offset()));
}));
return consumerGroupOffsetSet.stream().collect(groupingBy(ConsumerGroupOffset::topicPartition, Collectors.toUnmodifiableList()));
}

private Map<TopicPartition, TopicPartitionData> findTopicPartitionData(final Set<TopicPartition> topicPartitions) {
Map<TopicPartition, Integer> replicaCounts = findReplicaCounts(topicPartitions);
Map<TopicPartition, Long> endOffsets = findEndOffsets(topicPartitions);
HashMap<TopicPartition, TopicPartitionData> topicPartitionData = new HashMap<>();
for(TopicPartition topicPartition : topicPartitions) {
topicPartitionData.put(topicPartition, new TopicPartitionData(topicPartition, endOffsets.get(topicPartition), replicaCounts.get(topicPartition)));
}
return Collections.unmodifiableMap(topicPartitionData);
}

private void findReplicaCounts(final ClusterData clusterData, final Set<TopicPartition> topicPartitions) {
private Map<TopicPartition, Integer> findReplicaCounts(final Set<TopicPartition> topicPartitions) {
final Map<TopicPartition, Integer> replicaCounts = new HashMap<>();
Set<String> topics = topicPartitions.stream()
.map(TopicPartition::topic)
.collect(Collectors.toSet());
client.describeTopics(topics).values()
.forEach(topicDescription -> topicDescription.partitions().forEach(topicPartitionInfo -> {
final TopicPartition topicPartition = new TopicPartition(topicDescription.name(), topicPartitionInfo.partition());
clusterData.findTopicPartitionData(topicPartition).setNumReplicas(topicPartitionInfo.replicas().size());
replicaCounts.put(topicPartition, topicPartitionInfo.replicas().size());
}));
return Collections.unmodifiableMap(replicaCounts);
}

private void findEndOffsetsAndUpdateLag(final Set<TopicPartition> topicPartitions, final ClusterData clusterData) {
private Map<TopicPartition, Long> findEndOffsets(final Set<TopicPartition> topicPartitions) {
if (topicPartitions.isEmpty()) {
return;
return Collections.emptyMap();
}
long t = System.currentTimeMillis();
final Set<TopicPartition> multiReplicaPartitions = new HashSet<>();
final Set<TopicPartition> singleReplicaPartitions = new HashSet<>();
for (final TopicPartition topicPartition : topicPartitions) {
if (clusterData.findTopicPartitionData(topicPartition).getNumReplicas() > 1) {
multiReplicaPartitions.add(topicPartition);
} else {
singleReplicaPartitions.add(topicPartition);
}
}
findEndOffsetsAndUpdateLagImpl(multiReplicaPartitions, clusterData);
findEndOffsetsAndUpdateLagImpl(singleReplicaPartitions, clusterData);
t = System.currentTimeMillis() - t;
LOG.debug("Found end offsets in {} ms", t);
}

private void findEndOffsetsAndUpdateLagImpl(final Set<TopicPartition> topicPartitions, final ClusterData clusterData) {
final Map<TopicPartition, Long> endOffsets = new HashMap<>();
try {
client.endOffsets(topicPartitions)
.forEach((partition, offset) -> {
final TopicPartitionData topicPartitionData = clusterData.findTopicPartitionData(partition);
topicPartitionData.calculateLags(offset == null ? -1 : offset);
endOffsets.put(partition, (offset == null ? -1 : offset));
});
} catch (final TimeoutException e) {
setLagUnknown(topicPartitions, clusterData);
LOG.debug("Timed out getting endOffsets");
}
t = System.currentTimeMillis() - t;
LOG.debug("Found end offsets in {} ms", t);
return Collections.unmodifiableMap(endOffsets);
}

private static void setLagUnknown(final Set<TopicPartition> topicPartitions, final ClusterData clusterData) {
for (final TopicPartition topicPartition : topicPartitions) {
final TopicPartitionData topicPartitionData = clusterData.findTopicPartitionData(topicPartition);
topicPartitionData.resetLags();
private static Map<TopicPartitionData, List<ConsumerGroupData>> calculateLag(
Map<TopicPartition, List<ConsumerGroupOffset>> consumerGroupOffsets,
Map<TopicPartition, TopicPartitionData> topicPartitionData
) {
Map<TopicPartitionData, List<ConsumerGroupData>> topicAndConsumerData = new HashMap<>();
for (Map.Entry<TopicPartition, List<ConsumerGroupOffset>> entry : consumerGroupOffsets.entrySet()) {
TopicPartitionData partitionData = topicPartitionData.get(entry.getKey());
List<ConsumerGroupData> consumerGroupData = new ArrayList<>(entry.getValue().size());
for(ConsumerGroupOffset consumerGroupOffset : entry.getValue()) {
consumerGroupData.add(new ConsumerGroupData(consumerGroupOffset, partitionData.endOffset()));
}
topicAndConsumerData.put(partitionData, consumerGroupData);
}
return Collections.unmodifiableMap(topicAndConsumerData);
}

}
22 changes: 11 additions & 11 deletions src/main/java/io/statnett/k3a/lagexporter/PrometheusReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import io.prometheus.metrics.exporter.httpserver.HTTPServer;
import io.statnett.k3a.lagexporter.model.ClusterData;
import io.statnett.k3a.lagexporter.model.ConsumerGroupData;
import io.statnett.k3a.lagexporter.model.TopicPartitionData;

import java.io.IOException;
import java.util.List;

public final class PrometheusReporter {

Expand Down Expand Up @@ -42,24 +42,24 @@ public void start() {
}

public void publish(final ClusterData clusterData) {
final String clusterName = clusterData.getClusterName();
final String clusterName = clusterData.clusterName();
publishConsumerGroupLag(clusterName, clusterData);
publishPollTimeMs(clusterName, clusterData);
}

private void publishConsumerGroupLag(final String clusterName, final ClusterData clusterData) {
for (final TopicPartitionData topicPartitionData : clusterData.getAllTopicPartitionData()) {
final String topic = topicPartitionData.getTopicPartition().topic();
final String partition = String.valueOf(topicPartitionData.getTopicPartition().partition());
for (final ConsumerGroupData consumerGroupData : topicPartitionData.getConsumerGroupDataMap().values()) {
final String consumerGroupId = consumerGroupData.getConsumerGroupId();
final long lag = consumerGroupData.getLag();
for (final List<ConsumerGroupData> consumerGroupDataList : clusterData.topicAndConsumerData().values()) {
for (final ConsumerGroupData consumerGroupData : consumerGroupDataList) {
final String topic = consumerGroupData.topicPartition().topic();
final String partition = String.valueOf(consumerGroupData.topicPartition().partition());
final String consumerGroupId = consumerGroupData.consumerGroupId();
final long lag = consumerGroupData.lag();
if (lag >= 0) {
consumerGroupLagGauge
.labelValues(clusterName, consumerGroupId, topic, partition)
.set(lag);
}
final long offset = consumerGroupData.getOffset();
final long offset = consumerGroupData.offset();
if (offset >= 0) {
consumerGroupOffsetGauge
.labelValues(clusterName, consumerGroupId, topic, partition)
Expand All @@ -70,10 +70,10 @@ private void publishConsumerGroupLag(final String clusterName, final ClusterData
}

private void publishPollTimeMs(final String clusterName, final ClusterData clusterData) {
if (clusterData.getPollTimeMs() >= 0L) {
if (clusterData.pollTimeMs() >= 0L) {
pollTimeMsGauge
.labelValues(clusterName)
.set(clusterData.getPollTimeMs());
.set(clusterData.pollTimeMs());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,11 @@
package io.statnett.k3a.lagexporter.model;

import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public final class ClusterData {

private final String clusterName;
private final Map<TopicPartition, TopicPartitionData> topicPartitionDataMap = new HashMap<>();
private long pollTimeMs = -1L;

public ClusterData(final String clusterName) {
this.clusterName = clusterName;
}

public String getClusterName() {
return clusterName;
}

public TopicPartitionData findTopicPartitionData(final TopicPartition topicPartition) {
synchronized (topicPartitionDataMap) {
return topicPartitionDataMap.computeIfAbsent(topicPartition, TopicPartitionData::new);
}
}

public Collection<TopicPartitionData> getAllTopicPartitionData() {
synchronized (topicPartitionDataMap) {
return topicPartitionDataMap.values();
}
}

public long getPollTimeMs() {
return pollTimeMs;
}

public void setPollTimeMs(final long pollTimeMs) {
this.pollTimeMs = pollTimeMs;
}

public record ClusterData(
String clusterName,
Map<TopicPartitionData, List<ConsumerGroupData>> topicAndConsumerData,
long pollTimeMs
) {
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,28 @@
package io.statnett.k3a.lagexporter.model;

public final class ConsumerGroupData {

private final String consumerGroupId;
private long offset = -1;
private long lag = -1;

ConsumerGroupData(final String consumerGroupId) {
this.consumerGroupId = consumerGroupId;
}

public String getConsumerGroupId() {
return consumerGroupId;
}

public long getOffset() {
return offset;
import org.apache.kafka.common.TopicPartition;

public record ConsumerGroupData(
TopicPartition topicPartition,
String consumerGroupId,
long offset,
long lag
) {

public ConsumerGroupData(ConsumerGroupOffset consumerGroupOffset, long endOffset) {
this(
consumerGroupOffset.topicPartition(),
consumerGroupOffset.consumerGroupId(),
consumerGroupOffset.offset(),
calculateLag(consumerGroupOffset, endOffset)
);
}

public void setOffset(final long offset) {
this.offset = offset;
private static long calculateLag(ConsumerGroupOffset consumerGroupOffset, long endOffset) {
if (endOffset < 0) {
return -1;
} else {
return Math.max(0, endOffset - consumerGroupOffset.offset());
}
}

public long getLag() {
return lag;
}

public void setLag(final long lag) {
this.lag = lag;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.statnett.k3a.lagexporter.model;

import org.apache.kafka.common.TopicPartition;

public record ConsumerGroupOffset(
TopicPartition topicPartition,
String consumerGroupId,
long offset
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,9 @@

import org.apache.kafka.common.TopicPartition;

import java.util.HashMap;
import java.util.Map;

public final class TopicPartitionData {

private final TopicPartition topicPartition;
private int numReplicas = -1;
private final Map<String, ConsumerGroupData> consumerGroupDataMap = new HashMap<>();

public TopicPartitionData(final TopicPartition topicPartition) {
this.topicPartition = topicPartition;
}

public TopicPartition getTopicPartition() {
return topicPartition;
}

public int getNumReplicas() {
return numReplicas;
}

public void setNumReplicas(final int numReplicas) {
this.numReplicas = numReplicas;
}

public Map<String, ConsumerGroupData> getConsumerGroupDataMap() {
return consumerGroupDataMap;
}

public ConsumerGroupData findConsumerGroupData(final String consumerGroupId) {
synchronized (consumerGroupDataMap) {
return consumerGroupDataMap.computeIfAbsent(consumerGroupId, ConsumerGroupData::new);
}
}

public void resetLags() {
calculateLags(-1);
}

public void calculateLags(final long endOffset) {
synchronized (consumerGroupDataMap) {
for (final ConsumerGroupData consumerGroupData : consumerGroupDataMap.values()) {
if (endOffset < 0) {
consumerGroupData.setLag(-1);
} else {
consumerGroupData.setLag(Math.max(0, endOffset - consumerGroupData.getOffset()));
}
}
}
}

public record TopicPartitionData(
TopicPartition topicPartition,
long endOffset,
int numReplicas
) {
}
Loading