Skip to content

Commit

Permalink
FE: Add Prevoius button to topic message page - backend side (kafbat#550
Browse files Browse the repository at this point in the history
)
  • Loading branch information
hadisfr committed Sep 17, 2024
1 parent 7d68c19 commit 6493775
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 17 deletions.
7 changes: 5 additions & 2 deletions api/src/main/java/io/kafbat/ui/emitter/ConsumingStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ void incFilterApplyError() {
}

void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
String previousCursorId = cursor != null ? cursor.getPreviousCursorId() : null;
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.DONE)
.prevCursor( // FIXME
null
.prevCursor(
previousCursorId != null
? new TopicMessagePageCursorDTO().id(previousCursorId)
: null
)
.nextCursor(
cursor != null
Expand Down
19 changes: 16 additions & 3 deletions api/src/main/java/io/kafbat/ui/emitter/Cursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import io.kafbat.ui.serdes.ConsumerRecordDeserializer;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -22,7 +24,9 @@ public static class Tracking {
private final ConsumerPosition originalPosition;
private final Predicate<TopicMessageDTO> filter;
private final int limit;
private final Function<Cursor, String> registerAction;
private final String cursorId;
private final BiFunction<Cursor, String, String> registerAction;
private final Function<String, Optional<String>> previousCursorIdGetter;

//topic -> partition -> offset
private final Table<String, Integer, Long> trackingOffsets = HashBasedTable.create();
Expand All @@ -31,12 +35,16 @@ public Tracking(ConsumerRecordDeserializer deserializer,
ConsumerPosition originalPosition,
Predicate<TopicMessageDTO> filter,
int limit,
Function<Cursor, String> registerAction) {
String cursorId,
BiFunction<Cursor, String, String> registerAction,
Function<String, Optional<String>> previousCursorIdGetter) {
this.deserializer = deserializer;
this.originalPosition = originalPosition;
this.filter = filter;
this.limit = limit;
this.cursorId = cursorId;
this.registerAction = registerAction;
this.previousCursorIdGetter = previousCursorIdGetter;
}

void trackOffset(String topic, int partition, long offset) {
Expand Down Expand Up @@ -82,9 +90,14 @@ String registerCursor() {
),
filter,
limit
)
),
this.cursorId
);
}

String getPreviousCursorId() {
return this.previousCursorIdGetter.apply(this.cursorId).orElse(null);
}
}

}
18 changes: 11 additions & 7 deletions api/src/main/java/io/kafbat/ui/service/MessagesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
consumerPosition,
getMsgFilter(containsStringFilter, filterId),
fixPageSize(limit)
fixPageSize(limit),
null
);
}

Expand All @@ -235,7 +236,8 @@ public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topi
cursor.deserializer(),
cursor.consumerPosition(),
cursor.filter(),
cursor.limit()
cursor.limit(),
cursorId
);
}

Expand All @@ -244,18 +246,20 @@ private Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
ConsumerRecordDeserializer deserializer,
ConsumerPosition consumerPosition,
Predicate<TopicMessageDTO> filter,
int limit) {
int limit,
String cursorId) {
return withExistingTopic(cluster, topic)
.flux()
.publishOn(Schedulers.boundedElastic())
.flatMap(td -> loadMessagesImpl(cluster, deserializer, consumerPosition, filter, limit));
.flatMap(td -> loadMessagesImpl(cluster, deserializer, consumerPosition, filter, limit, cursorId));
}

private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
ConsumerRecordDeserializer deserializer,
ConsumerPosition consumerPosition,
Predicate<TopicMessageDTO> filter,
int limit) {
int limit,
String cursorId) {
var emitter = switch (consumerPosition.pollingMode()) {
case TO_OFFSET, TO_TIMESTAMP, LATEST -> new BackwardEmitter(
() -> consumerGroupService.createConsumer(cluster),
Expand All @@ -264,7 +268,7 @@ private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
deserializer,
filter,
cluster.getPollingSettings(),
cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit, cursorId)
);
case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> new ForwardEmitter(
() -> consumerGroupService.createConsumer(cluster),
Expand All @@ -273,7 +277,7 @@ private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
deserializer,
filter,
cluster.getPollingSettings(),
cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit, cursorId)
);
case TAILING -> new TailingEmitter(
() -> consumerGroupService.createConsumer(cluster),
Expand Down
25 changes: 21 additions & 4 deletions api/src/main/java/io/kafbat/ui/service/PollingCursorsStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kafbat.ui.model.ConsumerPosition;
import io.kafbat.ui.model.TopicMessageDTO;
import io.kafbat.ui.serdes.ConsumerRecordDeserializer;
import jakarta.annotation.Nullable;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
Expand All @@ -20,23 +21,39 @@ public class PollingCursorsStorage {
.maximumSize(MAX_SIZE)
.build();

private final Cache<String, String> previousCursorsMap = CacheBuilder.newBuilder()
.maximumSize(MAX_SIZE)
.build();

public Cursor.Tracking createNewCursor(ConsumerRecordDeserializer deserializer,
ConsumerPosition originalPosition,
Predicate<TopicMessageDTO> filter,
int limit) {
return new Cursor.Tracking(deserializer, originalPosition, filter, limit, this::register);
int limit,
@Nullable String cursorId) {
return new Cursor.Tracking(deserializer, originalPosition, filter, limit, cursorId, this::register,
this::getPreviousCursorId);
}

public Optional<Cursor> getCursor(String id) {
return Optional.ofNullable(cursorsCache.getIfPresent(id));
}

public String register(Cursor cursor) {
public String register(Cursor nextCursor, @Nullable String currentCursorId) {
var id = RandomStringUtils.random(8, true, true);
cursorsCache.put(id, cursor);
cursorsCache.put(id, nextCursor);
if (currentCursorId != null) {
previousCursorsMap.put(id, currentCursorId);
}
return id;
}

public Optional<String> getPreviousCursorId(@Nullable String cursorId) {
if (cursorId == null) {
return Optional.empty();
}
return Optional.ofNullable(previousCursorsMap.getIfPresent(cursorId));
}

@VisibleForTesting
public Map<String, Cursor> asMap() {
return cursorsCache.asMap();
Expand Down
2 changes: 1 addition & 1 deletion api/src/test/java/io/kafbat/ui/emitter/CursorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private ForwardEmitter createForwardEmitter(ConsumerPosition position) {
}

private Cursor.Tracking createCursor(ConsumerPosition position) {
return cursorsStorage.createNewCursor(createRecordsDeserializer(), position, m -> true, PAGE_SIZE);
return cursorsStorage.createNewCursor(createRecordsDeserializer(), position, m -> true, PAGE_SIZE, null);
}

private EnhancedConsumer createConsumer() {
Expand Down

0 comments on commit 6493775

Please sign in to comment.