From 724e8e86b82178bef67b855ef0c00183bd829de8 Mon Sep 17 00:00:00 2001 From: caoqingguang Date: Sat, 3 Mar 2018 20:47:41 +0800 Subject: [PATCH] fixed #85 require an messageId and send message with it --- src/main/java/io/vertx/mqtt/MqttEndpoint.java | 18 ++++++++++++++++++ .../io/vertx/mqtt/impl/MqttEndpointImpl.java | 12 +++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/vertx/mqtt/MqttEndpoint.java b/src/main/java/io/vertx/mqtt/MqttEndpoint.java index 8e1427c9..bb3e8458 100644 --- a/src/main/java/io/vertx/mqtt/MqttEndpoint.java +++ b/src/main/java/io/vertx/mqtt/MqttEndpoint.java @@ -352,6 +352,24 @@ public interface MqttEndpoint { @Fluent MqttEndpoint publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain); + /** + * Sends the PUBLISH message with specified messageId to the remote MQTT client + * + * @param topic topic on which the message is published + * @param payload message payload + * @param qosLevel quality of service level + * @param isDup if the message is a duplicate + * @param isRetain if the message needs to be retained + * @return a reference to this, so the API can be used fluently + */ + @Fluent + MqttEndpoint publishWithId(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain,int messageId); + + /** + * require an messageId + * @return the next messageId + */ + int requireMessageId(); /** * Sends the PINGRESP message to the remote MQTT client * diff --git a/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java index af3bab5c..aaaa9c3b 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java @@ -425,13 +425,18 @@ public MqttEndpointImpl publishComplete(int publishMessageId) { } public MqttEndpointImpl publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) { + publishWithId(topic, payload, qosLevel, isDup, isRetain, nextMessageId()); + return this; + } + @Override + public MqttEndpointImpl publishWithId(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, int messageId) { this.checkConnected(); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qosLevel, isRetain, 0); MqttPublishVariableHeader variableHeader = - new MqttPublishVariableHeader(topic, this.nextMessageId()); + new MqttPublishVariableHeader(topic, messageId); ByteBuf buf = Unpooled.copiedBuffer(payload.getBytes()); @@ -442,6 +447,11 @@ public MqttEndpointImpl publish(String topic, Buffer payload, MqttQoS qosLevel, return this; } + @Override + public int requireMessageId() { + return 0; + } + public MqttEndpointImpl pong() { this.checkConnected();