From b4f69458f811b0a49e91d80788cc1ecaeaf84f86 Mon Sep 17 00:00:00 2001 From: Serge Lacourte Date: Fri, 28 Apr 2023 13:37:50 +0200 Subject: [PATCH] Fix bug #994 - manualAcks for QoS 2 messages Signed-off-by: Serge Lacourte --- .../mqttv5/client/internal/ClientState.java | 23 +++++++++++-------- .../mqttv5/client/internal/CommsCallback.java | 15 ++++++++---- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java b/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java index 216d62c2d..6ae922a5c 100644 --- a/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java +++ b/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/ClientState.java @@ -1143,12 +1143,20 @@ protected void handleInboundPubRel(MqttPubRel pubRel) throws MqttException { new Object[] { pubRel.getMessageId(), pubRel.toString(), pubRel.getReasonCodes()[0] }); throw new MqttException(pubRel.getReasonCodes()[0]); } else { - // Currently this client has no need of the properties, so this is left empty. - MqttPubComp pubComp = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS, pubRel.getMessageId(), - new MqttProperties()); - // @TRACE 668=Creating MqttPubComp: {0} - log.info(CLASS_NAME, methodName, "668", new Object[] { pubComp.toString() }); - this.send(pubComp, null); + MqttPublish sendMsg = (MqttPublish) inboundQoS2.get(Integer.valueOf(pubRel.getMessageId())); + if (sendMsg != null) { + if (callback != null) { + callback.messageArrived(sendMsg); + } + } else { + // Original publish has already been delivered. + // Currently this client has no need of the properties, so this is left empty. + MqttPubComp pubComp = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS, + pubRel.getMessageId(), new MqttProperties()); + // @TRACE 668=Creating MqttPubComp: {0} + log.info(CLASS_NAME, methodName, "668", new Object[] { pubComp.toString() }); + this.send(pubComp, null); + } } } @@ -1219,9 +1227,6 @@ protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException { case 2: persistence.put(getReceivedPersistenceKey(message), (MqttPublish) message); inboundQoS2.put(Integer.valueOf(send.getMessageId()), send); - if (callback != null) { - callback.messageArrived(send); - } // Currently this client has no need of the properties, so this is left empty. this.send(new MqttPubRec(MqttReturnCode.RETURN_CODE_SUCCESS, send.getMessageId(), new MqttProperties()), null); diff --git a/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/CommsCallback.java b/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/CommsCallback.java index 7e7cdd7a3..b89300385 100644 --- a/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/CommsCallback.java +++ b/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/CommsCallback.java @@ -481,10 +481,17 @@ private void handleMessage(MqttPublish publishMessage) throws Exception { deliverMessage(destName, publishMessage.getMessageId(), publishMessage.getMessage()); // If we are not in manual ACK mode: - if (!this.manualAcks && publishMessage.getMessage().getQos() == 1) { - this.clientComms.internalSend(new MqttPubAck(MqttReturnCode.RETURN_CODE_SUCCESS, - publishMessage.getMessageId(), new MqttProperties()), - new MqttToken(clientComms.getClient().getClientId())); + if (!this.manualAcks) { + if (publishMessage.getMessage().getQos() == 1) { + this.clientComms.internalSend(new MqttPubAck(MqttReturnCode.RETURN_CODE_SUCCESS, + publishMessage.getMessageId(), new MqttProperties()), + new MqttToken(clientComms.getClient().getClientId())); + } else if (publishMessage.getMessage().getQos() == 2) { + this.clientComms.deliveryComplete(publishMessage.getMessageId()); + MqttPubComp pubComp = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS, + publishMessage.getMessageId(), new MqttProperties()); + this.clientComms.internalSend(pubComp, new MqttToken(clientComms.getClient().getClientId())); + } } }