Skip to content

Commit

Permalink
Fixing invalid message state transition
Browse files Browse the repository at this point in the history
Resolves wso2#1005
  • Loading branch information
isuru jayathilake authored and tharinduwijewardane committed Feb 22, 2019
1 parent 3bcc2ec commit 4b8715e
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public enum MessageStatus {
*/
SCHEDULED_TO_SEND,

/**
* Message has been recovered by the client it was sent to
*/
RECOVERED,

/**
* In a topic scenario, all subscribed consumers have acknowledged receipt of message
*/
Expand Down Expand Up @@ -127,9 +132,12 @@ public boolean isValidPreviousState(MessageStatus previousState) {
NO_MATCHING_CONSUMER.next = EnumSet.of(DLC_MESSAGE);
NO_MATCHING_CONSUMER.previous = EnumSet.of(BUFFERED);

SCHEDULED_TO_SEND.next = EnumSet.of(EXPIRED, ACKED_BY_ALL, BUFFERED, DLC_MESSAGE, SLOT_RETURNED);
SCHEDULED_TO_SEND.next = EnumSet.of(EXPIRED, RECOVERED, ACKED_BY_ALL, BUFFERED, DLC_MESSAGE, SLOT_RETURNED);
SCHEDULED_TO_SEND.previous = EnumSet.of(BUFFERED);

RECOVERED.next = EnumSet.of(SCHEDULED_TO_SEND, EXPIRED, DLC_MESSAGE, SLOT_RETURNED);
RECOVERED.previous = EnumSet.of(SCHEDULED_TO_SEND);

ACKED_BY_ALL.next = EnumSet.of(PREPARED_TO_DELETE, SLOT_RETURNED);
ACKED_BY_ALL.previous = EnumSet.of(SCHEDULED_TO_SEND);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ public int deliverMessageToSubscriptions(StorageQueue storageQueue) throws
break;
}

message.markAsScheduledToDeliver(subscriptionsToDeliver);

iterator.remove();
for (AndesSubscription localSubscription : subscriptionsToDeliver) {
MessageFlusher.getInstance().deliverMessageAsynchronously(localSubscription, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ public void rebufferUnackedMessages() throws AndesException {
*/
private void reDeliverMessage(DeliverableAndesMetadata messageMetadata) throws AndesException {
if (!messageMetadata.isOKToDispose()) {
messageMetadata.addMessageStatus(MessageStatus.RECOVERED);
MessageFlusher.getInstance().scheduleMessageForSubscription(this, messageMetadata);
MessageTracer.trace(messageMetadata, MessageTracer.MESSAGE_REQUEUED_SUBSCRIBER);
} else {
Expand Down

0 comments on commit 4b8715e

Please sign in to comment.