diff --git a/src/main/java/io/vertx/mqtt/MqttEndpoint.java b/src/main/java/io/vertx/mqtt/MqttEndpoint.java index 8e1427c9..bb3e8458 100644 --- a/src/main/java/io/vertx/mqtt/MqttEndpoint.java +++ b/src/main/java/io/vertx/mqtt/MqttEndpoint.java @@ -352,6 +352,24 @@ public interface MqttEndpoint { @Fluent MqttEndpoint publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain); + /** + * Sends the PUBLISH message with specified messageId to the remote MQTT client + * + * @param topic topic on which the message is published + * @param payload message payload + * @param qosLevel quality of service level + * @param isDup if the message is a duplicate + * @param isRetain if the message needs to be retained + * @return a reference to this, so the API can be used fluently + */ + @Fluent + MqttEndpoint publishWithId(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain,int messageId); + + /** + * require an messageId + * @return the next messageId + */ + int requireMessageId(); /** * Sends the PINGRESP message to the remote MQTT client * diff --git a/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java index af3bab5c..aaaa9c3b 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java @@ -425,13 +425,18 @@ public MqttEndpointImpl publishComplete(int publishMessageId) { } public MqttEndpointImpl publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) { + publishWithId(topic, payload, qosLevel, isDup, isRetain, nextMessageId()); + return this; + } + @Override + public MqttEndpointImpl publishWithId(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, int messageId) { this.checkConnected(); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qosLevel, isRetain, 0); MqttPublishVariableHeader variableHeader = - new MqttPublishVariableHeader(topic, this.nextMessageId()); + new MqttPublishVariableHeader(topic, messageId); ByteBuf buf = Unpooled.copiedBuffer(payload.getBytes()); @@ -442,6 +447,11 @@ public MqttEndpointImpl publish(String topic, Buffer payload, MqttQoS qosLevel, return this; } + @Override + public int requireMessageId() { + return 0; + } + public MqttEndpointImpl pong() { this.checkConnected();