-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSnapshotConsumer.java
54 lines (41 loc) · 1.84 KB
/
SnapshotConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
public class SnapshotConsumer {
public static void main(String[] args) {
String servers = args[0];
String topic = args[1];
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", "SnapshotConsumer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
OffsetInfo info = new OffsetInfo();
try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
info.lastOffset = consumer.endOffsets(partitions).values().toArray(new Long[0])[0] - 1;
if(info.lastOffset == -1) {
info.empty = true;
}
}
});
while (!info.empty) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("%s=%s%n", record.key(), record.value());
info.empty = (info.lastOffset == record.offset());
}
}
}
}
static class OffsetInfo {
public long lastOffset = 0;
public boolean empty = false;
}
}