Skip to content

Commit

Permalink
refactor fetch from iterator code in ungrouped src and reduce code du…
Browse files Browse the repository at this point in the history
…plication
  • Loading branch information
AayuStark007 committed May 22, 2024
1 parent 64cd3d2 commit bce4e4c
Showing 1 changed file with 20 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public CompletableFuture<Integer> nextMessages(MessageTracker[] messages) {
// Our first priority is to drain the iterator if it is set and return immediately.
// We do not want to proceed with consumer receiveAsync if we have messages in the iterator,
// as a slow or empty consumer might block the flow and cause the iterator contents to be stuck.
int offset = fetchFromIterator(messages);
int offset = fetchFromIterator(ongoingIterator, messages, 0);
if (offset > 0) {
return CompletableFuture.completedFuture(offset);
}
Expand All @@ -49,40 +49,33 @@ public CompletableFuture<Integer> nextMessages(MessageTracker[] messages) {
// Therefore, we use the futureInProgress flag to limit the concurrency and ensure only one future is in progress at a time.
if (futureInProgress.compareAndSet(false, true)) {
return consumer.receiveAsync()
.thenApply(polledMessages -> processPolledMessages(offset, polledMessages, messages))
.thenApply(polledMessages -> processPolledMessages(polledMessages, messages, offset))
.whenComplete((result, ex) -> futureInProgress.set(
false)); // any of the above stages can complete exceptionally, so this is to ensure the flag is reset.
}
return CompletableFuture.completedFuture(0);
}

private int processPolledMessages(int offset, PolledMessages<O> polledMessages, MessageTracker[] messages) {
int i = offset;

Iterator<PolledMessage<O>> polledMessagesIterator = polledMessages.iterator();
while (polledMessagesIterator.hasNext() && i < messages.length) {
PolledMessage<O> polledMessage = polledMessagesIterator.next();
MessageTracker messageTracker = new PolledMessageTracker<>(consumer, polledMessage);
messages[i++] = messageTracker;
}

if (polledMessagesIterator.hasNext()) {
ongoingIterator = polledMessagesIterator;
}
return i;
private int processPolledMessages(PolledMessages<O> polledMessages, MessageTracker[] messages, int startIndex) {
ongoingIterator = polledMessages.iterator();
return fetchFromIterator(ongoingIterator, messages, startIndex);
}

private int fetchFromIterator(MessageTracker[] messages) {
if (ongoingIterator == null || !ongoingIterator.hasNext()) {
return 0;
}

int i = 0;
while (i < messages.length && ongoingIterator.hasNext()) {
PolledMessage<O> polledMessage = ongoingIterator.next();
MessageTracker messageTracker = new PolledMessageTracker<>(consumer, polledMessage);
messages[i++] = messageTracker;
/**
* Fetches messages from the iterator and populates the message array.
*
* @param iterator Iterator of messages to fetch from.
* @param messages Array of message trackers to populate.
* @param startIndex Index into the messages array from where to start storing the messages.
*
* @return Index into the messages array where the next message should be stored. (will be equal to the length if completely full)
*/
private int fetchFromIterator(
Iterator<PolledMessage<O>> iterator, MessageTracker[] messages, int startIndex
) {
while (iterator != null && iterator.hasNext() && startIndex < messages.length) {
messages[startIndex++] = new PolledMessageTracker<>(consumer, iterator.next());
}
return i;
return startIndex;
}
}

0 comments on commit bce4e4c

Please sign in to comment.