Skip to content

Commit

Permalink
Fix bug eclipse-paho#994 - manualAcks for QoS 2 messages
Browse files Browse the repository at this point in the history
Signed-off-by: Serge Lacourte <[email protected]>
(cherry picked from commit b4f6945)
  • Loading branch information
lacourte committed Apr 28, 2023
1 parent 570ed6e commit 79cbad3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1143,12 +1143,20 @@ protected void handleInboundPubRel(MqttPubRel pubRel) throws MqttException {
new Object[] { pubRel.getMessageId(), pubRel.toString(), pubRel.getReasonCodes()[0] });
throw new MqttException(pubRel.getReasonCodes()[0]);
} else {
// Currently this client has no need of the properties, so this is left empty.
MqttPubComp pubComp = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS, pubRel.getMessageId(),
new MqttProperties());
// @TRACE 668=Creating MqttPubComp: {0}
log.info(CLASS_NAME, methodName, "668", new Object[] { pubComp.toString() });
this.send(pubComp, null);
MqttPublish sendMsg = (MqttPublish) inboundQoS2.get(Integer.valueOf(pubRel.getMessageId()));
if (sendMsg != null) {
if (callback != null) {
callback.messageArrived(sendMsg);
}
} else {
// Original publish has already been delivered.
// Currently this client has no need of the properties, so this is left empty.
MqttPubComp pubComp = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS,
pubRel.getMessageId(), new MqttProperties());
// @TRACE 668=Creating MqttPubComp: {0}
log.info(CLASS_NAME, methodName, "668", new Object[] { pubComp.toString() });
this.send(pubComp, null);
}
}
}

Expand Down Expand Up @@ -1219,9 +1227,6 @@ protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
case 2:
persistence.put(getReceivedPersistenceKey(message), (MqttPublish) message);
inboundQoS2.put(Integer.valueOf(send.getMessageId()), send);
if (callback != null) {
callback.messageArrived(send);
}
// Currently this client has no need of the properties, so this is left empty.
this.send(new MqttPubRec(MqttReturnCode.RETURN_CODE_SUCCESS, send.getMessageId(),
new MqttProperties()), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,17 @@ private void handleMessage(MqttPublish publishMessage) throws Exception {
deliverMessage(destName, publishMessage.getMessageId(), publishMessage.getMessage());

// If we are not in manual ACK mode:
if (!this.manualAcks && publishMessage.getMessage().getQos() == 1) {
this.clientComms.internalSend(new MqttPubAck(MqttReturnCode.RETURN_CODE_SUCCESS,
publishMessage.getMessageId(), new MqttProperties()),
new MqttToken(clientComms.getClient().getClientId()));
if (!this.manualAcks) {
if (publishMessage.getMessage().getQos() == 1) {
this.clientComms.internalSend(new MqttPubAck(MqttReturnCode.RETURN_CODE_SUCCESS,
publishMessage.getMessageId(), new MqttProperties()),
new MqttToken(clientComms.getClient().getClientId()));
} else if (publishMessage.getMessage().getQos() == 2) {
this.clientComms.deliveryComplete(publishMessage.getMessageId());
MqttPubComp pubComp = new MqttPubComp(MqttReturnCode.RETURN_CODE_SUCCESS,
publishMessage.getMessageId(), new MqttProperties());
this.clientComms.internalSend(pubComp, new MqttToken(clientComms.getClient().getClientId()));
}
}
}

Expand Down

0 comments on commit 79cbad3

Please sign in to comment.