diff --git a/consumer/src/main/java/com/flipkart/varadhi/consumer/UnGroupedMessageSrc.java b/consumer/src/main/java/com/flipkart/varadhi/consumer/UnGroupedMessageSrc.java index b6c72ae7..500720d1 100644 --- a/consumer/src/main/java/com/flipkart/varadhi/consumer/UnGroupedMessageSrc.java +++ b/consumer/src/main/java/com/flipkart/varadhi/consumer/UnGroupedMessageSrc.java @@ -38,7 +38,7 @@ public CompletableFuture nextMessages(MessageTracker[] messages) { // Our first priority is to drain the iterator if it is set and return immediately. // We do not want to proceed with consumer receiveAsync if we have messages in the iterator, // as a slow or empty consumer might block the flow and cause the iterator contents to be stuck. - int offset = fetchFromIterator(messages); + int offset = fetchFromIterator(ongoingIterator, messages, 0); if (offset > 0) { return CompletableFuture.completedFuture(offset); } @@ -49,40 +49,33 @@ public CompletableFuture nextMessages(MessageTracker[] messages) { // Therefore, we use the futureInProgress flag to limit the concurrency and ensure only one future is in progress at a time. if (futureInProgress.compareAndSet(false, true)) { return consumer.receiveAsync() - .thenApply(polledMessages -> processPolledMessages(offset, polledMessages, messages)) + .thenApply(polledMessages -> processPolledMessages(polledMessages, messages, offset)) .whenComplete((result, ex) -> futureInProgress.set( false)); // any of the above stages can complete exceptionally, so this is to ensure the flag is reset. } return CompletableFuture.completedFuture(0); } - private int processPolledMessages(int offset, PolledMessages polledMessages, MessageTracker[] messages) { - int i = offset; - - Iterator> polledMessagesIterator = polledMessages.iterator(); - while (polledMessagesIterator.hasNext() && i < messages.length) { - PolledMessage polledMessage = polledMessagesIterator.next(); - MessageTracker messageTracker = new PolledMessageTracker<>(consumer, polledMessage); - messages[i++] = messageTracker; - } - - if (polledMessagesIterator.hasNext()) { - ongoingIterator = polledMessagesIterator; - } - return i; + private int processPolledMessages(PolledMessages polledMessages, MessageTracker[] messages, int startIndex) { + ongoingIterator = polledMessages.iterator(); + return fetchFromIterator(ongoingIterator, messages, startIndex); } - private int fetchFromIterator(MessageTracker[] messages) { - if (ongoingIterator == null || !ongoingIterator.hasNext()) { - return 0; - } - - int i = 0; - while (i < messages.length && ongoingIterator.hasNext()) { - PolledMessage polledMessage = ongoingIterator.next(); - MessageTracker messageTracker = new PolledMessageTracker<>(consumer, polledMessage); - messages[i++] = messageTracker; + /** + * Fetches messages from the iterator and populates the message array. + * + * @param iterator Iterator of messages to fetch from. + * @param messages Array of message trackers to populate. + * @param startIndex Index into the messages array from where to start storing the messages. + * + * @return Index into the messages array where the next message should be stored. (will be equal to the length if completely full) + */ + private int fetchFromIterator( + Iterator> iterator, MessageTracker[] messages, int startIndex + ) { + while (iterator != null && iterator.hasNext() && startIndex < messages.length) { + messages[startIndex++] = new PolledMessageTracker<>(consumer, iterator.next()); } - return i; + return startIndex; } }