Skip to content

Commit

Permalink
FIx memory leak in receiver client
Browse files Browse the repository at this point in the history
  • Loading branch information
NipunaRanasinghe committed Aug 1, 2023
1 parent 778571b commit a4e5c11
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static org.ballerinax.asb.util.ASBConstants.DEAD_LETTER_ERROR_DESCRIPTION;
import static org.ballerinax.asb.util.ASBConstants.DEAD_LETTER_REASON;
import static org.ballerinax.asb.util.ASBConstants.DEAD_LETTER_SOURCE;
import static org.ballerinax.asb.util.ASBConstants.DEFAULT_MESSAGE_LOCK_TOKEN;
import static org.ballerinax.asb.util.ASBConstants.DELIVERY_COUNT;
import static org.ballerinax.asb.util.ASBConstants.ENQUEUED_SEQUENCE_NUMBER;
import static org.ballerinax.asb.util.ASBConstants.ENQUEUED_TIME;
Expand Down Expand Up @@ -281,6 +282,7 @@ public static Object complete(BObject endpointClient, BString lockToken) {
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) endpointClient
.getNativeData(lockToken.getValue());
receiver.complete(message);
endpointClient.getNativeData().remove(lockToken.getValue());
LOGGER.debug("Completed the message(Id: " + message.getMessageId() + ") with lockToken " + lockToken);
return null;
} catch (BError e) {
Expand All @@ -305,6 +307,7 @@ public static Object abandon(BObject endpointClient, BString lockToken) {
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) endpointClient
.getNativeData(lockToken.getValue());
receiver.abandon(message);
endpointClient.getNativeData().remove(lockToken.getValue());
LOGGER.debug(String.format("Done abandoning a message(Id: %s) using its lock token from \n%s",
message.getMessageId(), receiver.getEntityPath()));
return null;
Expand Down Expand Up @@ -336,6 +339,7 @@ public static Object deadLetter(BObject endpointClient, BString lockToken, Objec
.setDeadLetterErrorDescription(ASBUtils.convertString(deadLetterErrorDescription));
options.setDeadLetterReason(ASBUtils.convertString(deadLetterReason));
receiver.deadLetter(message, options);
endpointClient.getNativeData().remove(lockToken.getValue());
LOGGER.debug(String.format("Done dead-lettering a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
return null;
Expand All @@ -361,6 +365,7 @@ public static Object defer(BObject endpointClient, BString lockToken) {
ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) endpointClient
.getNativeData(lockToken.getValue());
receiver.defer(message);
endpointClient.getNativeData().remove(lockToken.getValue());
LOGGER.debug(String.format("Done deferring a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
return null;
Expand Down Expand Up @@ -417,6 +422,7 @@ public static Object renewLock(BObject endpointClient, BString lockToken) {
.getNativeData(lockToken.getValue());
ServiceBusReceiverClient receiver = getReceiverFromBObject(endpointClient);
receiver.renewMessageLock(message);
endpointClient.getNativeData().remove(lockToken.getValue());
LOGGER.debug(String.format("Done renewing a message(Id: %s) using its lock token from %s",
message.getMessageId(), receiver.getEntityPath()));
return null;
Expand Down Expand Up @@ -471,7 +477,11 @@ private static BMap<BString, Object> constructExpectedMessageRecord(BObject endp
} else {
map.put(BODY, messageBody);
}
endpointClient.addNativeData(message.getLockToken(), message);

// This is to avoid adding messages to the native data map, if the receive mode is 'RECEIVE_AND_DELETE'.
if (!message.getLockToken().equals(DEFAULT_MESSAGE_LOCK_TOKEN)) {
endpointClient.addNativeData(message.getLockToken(), message);
}
return createBRecordValue(map, expectedType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class ASBConstants {
public static final String APPLICATION_PROPERTIES = "properties";

public static final int DEFAULT_TIME_TO_LIVE = 60; // In seconds
public static final String DEFAULT_MESSAGE_LOCK_TOKEN = "00000000-0000-0000-0000-000000000000";

// listener constant fields
public static final String CONSUMER_SERVICES = "consumer_services";
Expand Down

0 comments on commit a4e5c11

Please sign in to comment.