Skip to content

Commit

Permalink
Implement 7.3.6 and 7.3.9 - restriction on Message object
Browse files Browse the repository at this point in the history
  • Loading branch information
kenliao94 committed Dec 14, 2024
1 parent dbb78fb commit 75b141a
Show file tree
Hide file tree
Showing 9 changed files with 374 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,21 @@ protected void init() throws OpenDataException {
public Map<String, Object> getFields(Object o) throws OpenDataException {
ActiveMQMessage m = (ActiveMQMessage)o;
Map<String, Object> rc = super.getFields(o);
rc.put("JMSCorrelationID", m.getJMSCorrelationID());
rc.put("JMSDestination", "" + m.getJMSDestination());
rc.put("JMSMessageID", m.getJMSMessageID());
rc.put("JMSReplyTo",toString(m.getJMSReplyTo()));
rc.put("JMSType", m.getJMSType());
rc.put("JMSDeliveryMode", m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON-PERSISTENT");
rc.put("JMSExpiration", m.getJMSExpiration());
rc.put("JMSPriority", m.getJMSPriority());
rc.put("JMSRedelivered", m.getJMSRedelivered());
rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
try {
rc.put("JMSCorrelationID", m.getJMSCorrelationID());
rc.put("JMSDestination", "" + m.getJMSDestination());
rc.put("JMSMessageID", m.getJMSMessageID());
rc.put("JMSReplyTo",toString(m.getJMSReplyTo()));
rc.put("JMSType", m.getJMSType());
rc.put("JMSDeliveryMode", m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON-PERSISTENT");
rc.put("JMSExpiration", m.getJMSExpiration());
rc.put("JMSPriority", m.getJMSPriority());
rc.put("JMSRedelivered", m.getJMSRedelivered());
rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
} catch (JMSException e) {
throw new OpenDataException(e.getMessage());
}

rc.put(CompositeDataConstants.JMSXGROUP_ID, m.getGroupID());
rc.put(CompositeDataConstants.JMSXGROUP_SEQ, m.getGroupSequence());
rc.put(CompositeDataConstants.JMSXUSER_ID, m.getUserID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2136,7 +2136,12 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin
msg.setBrokerPath(null);

msg.setTransactionId(txid);
if (connection.isCopyMessageOnSend()) {
final ActiveMQMessage originalMessage = msg;
if (connection.isCopyMessageOnSend() || completionListener != null) {
// We need to make the message inaccessible per Jakarta Messaging 3.1 - 7.3.6 & 7.3.9
// https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#restrictions-on-the-use-of-the-message-object
// To do that, we need to set a flag in the message referenced in sender thread. To avoid making
// the message inaccessible once received on the server side (even tho the WireFormat marshaller doesn't marshal that field, so it shouldn't matter)
msg = (ActiveMQMessage)msg.copy();
}
msg.setConnection(connection);
Expand Down Expand Up @@ -2167,10 +2172,12 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin
// Make the Message object unaccessible and unmutable
// per Jakarta Messaging 3.1 spec section 7.3.9 and 7.3.6
numIncompletedAsyncSend.doIncrement();
originalMessage.setMessageAccessible(false);
wrapperCompletionListener = new CompletionListener() {
@Override
public void onCompletion(Message message) {
try {
originalMessage.setMessageAccessible(true);
inCompletionListenerCallback.set(true);
producerInCompletionListenerCallback.set(true);
numIncompletedAsyncSend.doDecrement();
Expand All @@ -2188,6 +2195,7 @@ public void onCompletion(Message message) {
@Override
public void onException(Message message, Exception e) {
try {
originalMessage.setMessageAccessible(true);
inCompletionListenerCallback.set(true);
completionListener.onException(message, e);
} finally {
Expand Down
Loading

0 comments on commit 75b141a

Please sign in to comment.