Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][AMQ-8324][Jakarta Messaging 3.1] Asynchronous send with CompletionListener #1364

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,21 @@ protected void init() throws OpenDataException {
public Map<String, Object> getFields(Object o) throws OpenDataException {
ActiveMQMessage m = (ActiveMQMessage)o;
Map<String, Object> rc = super.getFields(o);
rc.put("JMSCorrelationID", m.getJMSCorrelationID());
rc.put("JMSDestination", "" + m.getJMSDestination());
rc.put("JMSMessageID", m.getJMSMessageID());
rc.put("JMSReplyTo",toString(m.getJMSReplyTo()));
rc.put("JMSType", m.getJMSType());
rc.put("JMSDeliveryMode", m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON-PERSISTENT");
rc.put("JMSExpiration", m.getJMSExpiration());
rc.put("JMSPriority", m.getJMSPriority());
rc.put("JMSRedelivered", m.getJMSRedelivered());
rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
try {
rc.put("JMSCorrelationID", m.getJMSCorrelationID());
rc.put("JMSDestination", "" + m.getJMSDestination());
rc.put("JMSMessageID", m.getJMSMessageID());
rc.put("JMSReplyTo",toString(m.getJMSReplyTo()));
rc.put("JMSType", m.getJMSType());
rc.put("JMSDeliveryMode", m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON-PERSISTENT");
rc.put("JMSExpiration", m.getJMSExpiration());
rc.put("JMSPriority", m.getJMSPriority());
rc.put("JMSRedelivered", m.getJMSRedelivered());
rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
} catch (JMSException e) {
throw new OpenDataException(e.getMessage());
}

rc.put(CompositeDataConstants.JMSXGROUP_ID, m.getGroupID());
rc.put(CompositeDataConstants.JMSXGROUP_SEQ, m.getGroupSequence());
rc.put(CompositeDataConstants.JMSXUSER_ID, m.getUserID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import jakarta.jms.CompletionListener;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionConsumer;
import jakarta.jms.ConnectionMetaData;
Expand Down Expand Up @@ -1439,6 +1440,68 @@ public void onCompletion(FutureResponse resp) {
}
}

/**
* Send a packet through a Connection - for internal use only
*
* @param command
*
* @throws JMSException
*/
public void syncSendPacket(final Command command, final CompletionListener completionListener) throws JMSException {
if(completionListener==null) {
syncSendPacket(command);
} else {
if (isClosed()) {
throw new ConnectionClosedException();
}
try {
this.transport.asyncRequest(command, resp -> {
Response response;
Throwable exception = null;
try {
response = resp.getResult();
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
exception = er.getException();
}
} catch (Exception e) {
exception = e;
}
if (exception != null) {
if ( exception instanceof JMSException) {
completionListener.onException((jakarta.jms.Message) command, (JMSException) exception);
} else {
if (isClosed() || closing.get()) {
LOG.debug("Received an exception but connection is closing");
}
JMSException jmsEx = null;
try {
jmsEx = JMSExceptionSupport.create(exception);
} catch(Throwable e) {
LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
}
// dispose of transport for security exceptions on connection initiation
if (exception instanceof SecurityException && command instanceof ConnectionInfo){
try {
forceCloseOnSecurityException(exception);
} catch (Throwable t) {
// We throw the original error from the ExceptionResponse instead.
}
}
if (jmsEx != null) {
completionListener.onException((jakarta.jms.Message) command, jmsEx);
}
}
} else {
completionListener.onCompletion((jakarta.jms.Message) command);
}
});
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
}

private void forceCloseOnSecurityException(Throwable exception) {
LOG.trace("force close on security exception:{}, transport={}", this, transport, exception);
onException(new IOException("Force close due to SecurityException on connect", exception));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import jakarta.jms.CompletionListener;
import jakarta.jms.Destination;
import jakarta.jms.IllegalStateException;
import jakarta.jms.IllegalStateRuntimeException;
import jakarta.jms.InvalidDestinationException;
import jakarta.jms.JMSException;
import jakarta.jms.Message;

import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
Expand All @@ -35,6 +35,7 @@
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.CountdownLock;
import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -83,11 +84,14 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
private final long startTime;
private MessageTransformer transformer;
private MemoryUsage producerWindow;
private final ThreadLocal<Boolean> inCompletionListenerCallback = new ThreadLocal<>();
private final CountdownLock numIncompleteAsyncSend = new CountdownLock();

protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
super(session);
this.info = new ProducerInfo(producerId);
this.info.setWindowSize(session.connection.getProducerWindowSize());
inCompletionListenerCallback.set(false);
// Allows the options on the destination to configure the producerInfo
if (destination != null && destination.getOptions() != null) {
Map<String, Object> options = IntrospectionSupport.extractProperties(
Expand Down Expand Up @@ -168,6 +172,10 @@ public Destination getDestination() throws JMSException {
*/
@Override
public void close() throws JMSException {
if (inCompletionListenerCallback != null && inCompletionListenerCallback.get()) {
throw new IllegalStateRuntimeException("Can't close message producer within CompletionListener");
}
waitForAsyncSendToFinish();
if (!closed) {
dispose();
this.session.asyncSendPacket(info.createRemoveCommand());
Expand Down Expand Up @@ -227,7 +235,7 @@ public void send(Destination destination, Message message, int deliveryMode, int
/**
*
* @param message the message to send
* @param CompletionListener to callback
* @param completionListener to callback
* @throws JMSException if the JMS provider fails to send the message due to
* some internal error.
* @throws UnsupportedOperationException if an invalid destination is
Expand All @@ -239,27 +247,88 @@ public void send(Destination destination, Message message, int deliveryMode, int
*/
@Override
public void send(Message message, CompletionListener completionListener) throws JMSException {
throw new UnsupportedOperationException("send(Message, CompletionListener) is not supported");
this.send(getDestination(),
message,
defaultDeliveryMode,
defaultPriority,
defaultTimeToLive,
completionListener);
}


@Override
public void send(Message message, int deliveryMode, int priority, long timeToLive,
CompletionListener completionListener) throws JMSException {
throw new UnsupportedOperationException("send(Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported");
this.send(this.getDestination(),
message,
deliveryMode,
priority,
timeToLive,
completionListener);
}

@Override
public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException {
throw new UnsupportedOperationException("send(Destination, Message, CompletionListener) is not supported");
this.send(destination,
message,
defaultDeliveryMode,
defaultPriority,
defaultTimeToLive,
completionListener);
}

@Override
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
CompletionListener completionListener) throws JMSException {
throw new UnsupportedOperationException("send(Destination, Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported");
this.send(destination, message, deliveryMode, priority, timeToLive,
getDisableMessageID(), getDisableMessageTimestamp(), completionListener);
}

public void send(Message message, AsyncCallback onComplete) throws JMSException {
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
boolean disableMessageID, boolean disableMessageTimestamp, CompletionListener completionListener) throws JMSException {
checkClosed();
if (destination == null) {
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
}
throw new InvalidDestinationException("Don't understand null destinations");
}

ActiveMQDestination dest;
if (destination.equals(info.getDestination())) {
dest = (ActiveMQDestination)destination;
} else if (info.getDestination() == null) {
dest = ActiveMQDestination.transform(destination);
} else {
throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
}
if (dest == null) {
throw new JMSException("No destination specified");
}

if (transformer != null) {
Message transformedMessage = transformer.producerTransform(session, this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}

if (producerWindow != null) {
try {
producerWindow.waitForSpace();
} catch (InterruptedException e) {
throw new JMSException("Send aborted due to thread interrupt.");
}
}

this.session.send(this, dest, message, deliveryMode, priority, timeToLive, disableMessageID,
disableMessageTimestamp, producerWindow, sendTimeout, completionListener, inCompletionListenerCallback, numIncompleteAsyncSend);

stats.onMessage();
}


public void send(Message message, AsyncCallback onComplete) throws JMSException {
this.send(this.getDestination(),
message,
this.defaultDeliveryMode,
Expand Down Expand Up @@ -389,4 +458,7 @@ public void onProducerAck(ProducerAck pa) {
}
}

private void waitForAsyncSendToFinish() {
numIncompleteAsyncSend.doWaitForZero();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class ActiveMQProducer implements JMSProducer {

// Properties applied to all messages on a per-JMS producer instance basis
private Map<String, Object> messageProperties = null;
private CompletionListener completionListener = null;

ActiveMQProducer(ActiveMQContext activemqContext, ActiveMQMessageProducer activemqMessageProducer) {
this.activemqContext = activemqContext;
Expand Down Expand Up @@ -86,8 +87,7 @@ public JMSProducer send(Destination destination, Message message) {
message.setObjectProperty(propertyEntry.getKey(), propertyEntry.getValue());
}
}

activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), null);
activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), getAsync());
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
Expand Down Expand Up @@ -253,12 +253,13 @@ public long getDeliveryDelay() {

@Override
public JMSProducer setAsync(CompletionListener completionListener) {
throw new UnsupportedOperationException("setAsync(CompletionListener) is not supported");
this.completionListener = completionListener;
return this;
}

@Override
public CompletionListener getAsync() {
throw new UnsupportedOperationException("getAsync() is not supported");
return this.completionListener;
}

@Override
Expand Down
Loading