From a4e5c11db31b0f322981682fd8809da8afee6ae8 Mon Sep 17 00:00:00 2001 From: Nipuna Ranasinghe Date: Tue, 1 Aug 2023 13:58:45 +0530 Subject: [PATCH] FIx memory leak in receiver client --- .../org/ballerinax/asb/receiver/MessageReceiver.java | 12 +++++++++++- .../java/org/ballerinax/asb/util/ASBConstants.java | 1 + 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java b/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java index 428c6178..698a2b1f 100644 --- a/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java +++ b/asb-native/src/main/java/org/ballerinax/asb/receiver/MessageReceiver.java @@ -66,6 +66,7 @@ import static org.ballerinax.asb.util.ASBConstants.DEAD_LETTER_ERROR_DESCRIPTION; import static org.ballerinax.asb.util.ASBConstants.DEAD_LETTER_REASON; import static org.ballerinax.asb.util.ASBConstants.DEAD_LETTER_SOURCE; +import static org.ballerinax.asb.util.ASBConstants.DEFAULT_MESSAGE_LOCK_TOKEN; import static org.ballerinax.asb.util.ASBConstants.DELIVERY_COUNT; import static org.ballerinax.asb.util.ASBConstants.ENQUEUED_SEQUENCE_NUMBER; import static org.ballerinax.asb.util.ASBConstants.ENQUEUED_TIME; @@ -281,6 +282,7 @@ public static Object complete(BObject endpointClient, BString lockToken) { ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) endpointClient .getNativeData(lockToken.getValue()); receiver.complete(message); + endpointClient.getNativeData().remove(lockToken.getValue()); LOGGER.debug("Completed the message(Id: " + message.getMessageId() + ") with lockToken " + lockToken); return null; } catch (BError e) { @@ -305,6 +307,7 @@ public static Object abandon(BObject endpointClient, BString lockToken) { ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) endpointClient .getNativeData(lockToken.getValue()); receiver.abandon(message); + endpointClient.getNativeData().remove(lockToken.getValue()); LOGGER.debug(String.format("Done abandoning a message(Id: %s) using its lock token from \n%s", message.getMessageId(), receiver.getEntityPath())); return null; @@ -336,6 +339,7 @@ public static Object deadLetter(BObject endpointClient, BString lockToken, Objec .setDeadLetterErrorDescription(ASBUtils.convertString(deadLetterErrorDescription)); options.setDeadLetterReason(ASBUtils.convertString(deadLetterReason)); receiver.deadLetter(message, options); + endpointClient.getNativeData().remove(lockToken.getValue()); LOGGER.debug(String.format("Done dead-lettering a message(Id: %s) using its lock token from %s", message.getMessageId(), receiver.getEntityPath())); return null; @@ -361,6 +365,7 @@ public static Object defer(BObject endpointClient, BString lockToken) { ServiceBusReceivedMessage message = (ServiceBusReceivedMessage) endpointClient .getNativeData(lockToken.getValue()); receiver.defer(message); + endpointClient.getNativeData().remove(lockToken.getValue()); LOGGER.debug(String.format("Done deferring a message(Id: %s) using its lock token from %s", message.getMessageId(), receiver.getEntityPath())); return null; @@ -417,6 +422,7 @@ public static Object renewLock(BObject endpointClient, BString lockToken) { .getNativeData(lockToken.getValue()); ServiceBusReceiverClient receiver = getReceiverFromBObject(endpointClient); receiver.renewMessageLock(message); + endpointClient.getNativeData().remove(lockToken.getValue()); LOGGER.debug(String.format("Done renewing a message(Id: %s) using its lock token from %s", message.getMessageId(), receiver.getEntityPath())); return null; @@ -471,7 +477,11 @@ private static BMap constructExpectedMessageRecord(BObject endp } else { map.put(BODY, messageBody); } - endpointClient.addNativeData(message.getLockToken(), message); + + // This is to avoid adding messages to the native data map, if the receive mode is 'RECEIVE_AND_DELETE'. + if (!message.getLockToken().equals(DEFAULT_MESSAGE_LOCK_TOKEN)) { + endpointClient.addNativeData(message.getLockToken(), message); + } return createBRecordValue(map, expectedType); } diff --git a/asb-native/src/main/java/org/ballerinax/asb/util/ASBConstants.java b/asb-native/src/main/java/org/ballerinax/asb/util/ASBConstants.java index 3a560ca7..1ab9fc84 100644 --- a/asb-native/src/main/java/org/ballerinax/asb/util/ASBConstants.java +++ b/asb-native/src/main/java/org/ballerinax/asb/util/ASBConstants.java @@ -69,6 +69,7 @@ public class ASBConstants { public static final String APPLICATION_PROPERTIES = "properties"; public static final int DEFAULT_TIME_TO_LIVE = 60; // In seconds + public static final String DEFAULT_MESSAGE_LOCK_TOKEN = "00000000-0000-0000-0000-000000000000"; // listener constant fields public static final String CONSUMER_SERVICES = "consumer_services";