-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
making fixes to grouped message src, so that nextMessages always retu… #145
making fixes to grouped message src, so that nextMessages always retu… #145
Conversation
…rns with messages asynchronously. previously it could return with 0 msgs and used to prioritize fetching messages from consumer
…one on consumer thread only.
@@ -158,7 +214,7 @@ private void free(String groupId, MessageConsumptionStatus status) { | |||
throw new IllegalStateException(String.format("Tried to free group %s: %s", gId, tracker)); | |||
} | |||
var messages = tracker.messages; | |||
if (!messages.isEmpty() && messages.getFirst().remaining() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using while
to make sure that messages batch with empty entries are not going to be put back in the hashmap.
return replenishAvailableGroups().thenApply(v -> nextMessagesInternal(messages)); | ||
int count = nextMessagesInternal(messages); | ||
if (count > 0) { | ||
return CompletableFuture.completedFuture(count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prioritizing the array filling.
If no message present / no free group present. try to fetch fresh messages.
And this is the difference, There are 2 conditions, which can unblock the "array filling". the fetch returning with new messages.
and the group getting freed up.
private void tryCompletePendingRequest() { | ||
NextMsgsRequest request; | ||
if ((request = pendingRequest.getAndSet(null)) != null) { | ||
request.result.complete(nextMessagesInternal(request.messages)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is still a chance, where we still end up completing the request with 0 messages, but this should be much rarer now.
#145) Making fixes to grouped message src, so that nextMessages always returns with messages asynchronously. previously it could return with 0 msgs and used to prioritize fetching messages from consumer
…rns with messages asynchronously. previously it could return with 0 msgs and used to prioritize fetching messages from consumer