From 779017d6758c1916910eb8339176f5754d810425 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Travais?= Date: Wed, 7 Oct 2020 15:14:42 +0200 Subject: [PATCH] Use Netty's MqttMessageBuilders when possible --- .../io/vertx/mqtt/impl/MqttClientImpl.java | 94 ++++++------------- .../io/vertx/mqtt/impl/MqttEndpointImpl.java | 14 ++- .../test/server/MqttServerBadClientTest.java | 64 +++++-------- 3 files changed, 56 insertions(+), 116 deletions(-) diff --git a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java index a7f16a1c..24c6ff54 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java @@ -22,20 +22,19 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.DecoderResult; -import io.netty.handler.codec.mqtt.MqttConnectPayload; +import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; -import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttMessageFactory; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; -import io.netty.handler.codec.mqtt.MqttSubscribePayload; -import io.netty.handler.codec.mqtt.MqttTopicSubscription; -import io.netty.handler.codec.mqtt.MqttUnsubscribePayload; +import io.netty.handler.codec.mqtt.MqttSubscribeMessage; +import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; @@ -63,13 +62,11 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import java.util.stream.Stream; import static io.netty.handler.codec.mqtt.MqttQoS.*; @@ -86,8 +83,6 @@ public class MqttClientImpl implements MqttClient { private static final int MAX_MESSAGE_ID = 65535; private static final int MAX_TOPIC_LEN = 65535; private static final int MIN_TOPIC_LEN = 1; - private static final String PROTOCOL_NAME = "MQTT"; - private static final int PROTOCOL_VERSION = 4; private static final int DEFAULT_IDLE_TIMEOUT = 0; private final VertxInternal vertx; @@ -246,33 +241,20 @@ private Future doConnect(int port, String host, String serve // an exception at connection level soi.exceptionHandler(this::handleException); - MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, - false, - AT_MOST_ONCE, - false, - 0); - - MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader( - PROTOCOL_NAME, - PROTOCOL_VERSION, - options.hasUsername(), - options.hasPassword(), - options.isWillRetain(), - options.getWillQoS(), - options.isWillFlag(), - options.isCleanSession(), - options.getKeepAliveTimeSeconds() - ); - - MqttConnectPayload payload = new MqttConnectPayload( - options.getClientId() == null ? "" : options.getClientId(), - options.getWillTopic(), - options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null, - options.hasUsername() ? options.getUsername() : null, - options.hasPassword() ? options.getPassword().getBytes() : null - ); - - io.netty.handler.codec.mqtt.MqttMessage connect = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); + final MqttConnectMessage connect = MqttMessageBuilders.connect() + .hasUser(options.hasUsername()) + .hasPassword(options.hasPassword()) + .willRetain(options.isWillRetain()) + .willQoS(MqttQoS.valueOf(options.getWillQoS())) + .willFlag(options.isWillFlag()) + .willTopic(options.getWillTopic()) + .willMessage(options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null) + .cleanSession(options.isCleanSession()) + .keepAlive(options.getKeepAliveTimeSeconds()) + .clientId(options.getClientId() == null ? "" : options.getClientId()) + .username(options.hasUsername() ? options.getUsername() : null) + .password(options.hasPassword() ? options.getPassword().getBytes() : null) + .build(); this.write(connect); } @@ -491,24 +473,12 @@ public Future subscribe(Map topics) { return ctx.failedFuture(exception); } - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.SUBSCRIBE, - false, - AT_LEAST_ONCE, - false, - 0); - - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId()); - List subscriptions = topics.entrySet() - .stream() - .map(e -> new MqttTopicSubscription(e.getKey(), valueOf(e.getValue()))) - .collect(Collectors.toList()); - - MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions); - - io.netty.handler.codec.mqtt.MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); + final MqttMessageBuilders.SubscribeBuilder subscribeBuilder = MqttMessageBuilders.subscribe() + .messageId(nextMessageId()); + topics.forEach((topic, qos) -> subscribeBuilder.addSubscription(MqttQoS.valueOf(qos), topic)); + final MqttSubscribeMessage subscribe = subscribeBuilder.build(); - return this.write(subscribe).map(variableHeader.messageId()); + return this.write(subscribe).map(subscribe.variableHeader().messageId()); } /** @@ -558,22 +528,14 @@ public MqttClient unsubscribe(String topic, Handler> unsubs @Override public Future unsubscribe(String topic) { - MqttFixedHeader fixedHeader = new MqttFixedHeader( - MqttMessageType.UNSUBSCRIBE, - false, - AT_LEAST_ONCE, - false, - 0); - - MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId()); - - MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList())); - - io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); + final MqttUnsubscribeMessage unsubscribe = MqttMessageBuilders.unsubscribe() + .addTopicFilter(topic) + .messageId(nextMessageId()) + .build(); this.write(unsubscribe); - return ctx.succeededFuture(variableHeader.messageId()); + return ctx.succeededFuture(unsubscribe.variableHeader().messageId()); } /** diff --git a/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java index c49f4563..d1a27116 100644 --- a/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java +++ b/src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java @@ -18,9 +18,10 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; +import io.netty.handler.codec.mqtt.MqttConnAckMessage; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttMessageFactory; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageType; @@ -30,7 +31,6 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.impl.NetSocketInternal; import io.vertx.core.impl.logging.Logger; @@ -298,12 +298,10 @@ public MqttEndpointImpl exceptionHandler(Handler handler) { private MqttEndpointImpl connack(MqttConnectReturnCode returnCode, boolean sessionPresent) { - MqttFixedHeader fixedHeader = - new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0); - MqttConnAckVariableHeader variableHeader = - new MqttConnAckVariableHeader(returnCode, sessionPresent); - - io.netty.handler.codec.mqtt.MqttMessage connack = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null); + MqttConnAckMessage connack = MqttMessageBuilders.connAck() + .returnCode(returnCode) + .sessionPresent(sessionPresent) + .build(); this.write(connack); diff --git a/src/test/java/io/vertx/mqtt/test/server/MqttServerBadClientTest.java b/src/test/java/io/vertx/mqtt/test/server/MqttServerBadClientTest.java index 3772e7f3..4e9b8fce 100644 --- a/src/test/java/io/vertx/mqtt/test/server/MqttServerBadClientTest.java +++ b/src/test/java/io/vertx/mqtt/test/server/MqttServerBadClientTest.java @@ -27,15 +27,10 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.mqtt.MqttConnectPayload; -import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttEncoder; -import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; -import io.netty.handler.codec.mqtt.MqttMessageFactory; -import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttPublishMessage; -import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.util.CharsetUtil; import io.vertx.core.buffer.Buffer; @@ -57,8 +52,6 @@ @RunWith(VertxUnitRunner.class) public class MqttServerBadClientTest extends MqttServerBaseTest { - private static final String PROTOCOL_NAME = "MQTT"; - private static final int PROTOCOL_VERSION = 4; private static final String MQTT_TOPIC = "/my_topic"; private static final String MQTT_MESSAGE = "I'm a bad client"; @@ -182,45 +175,32 @@ public void unknownMessageType(TestContext context) { private final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); private MqttPublishMessage createPublishMessage() { - - MqttFixedHeader mqttFixedHeader = - new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, true, 0); - - MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(MQTT_TOPIC, 1); - ByteBuf payload = ALLOCATOR.buffer(); payload.writeBytes(MQTT_MESSAGE.getBytes(CharsetUtil.UTF_8)); - return new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, payload); + return return MqttMessageBuilders.publish() + .qos(MqttQoS.AT_LEAST_ONCE) + .retained(true) + .topicName(MQTT_TOPIC) + .messageId(1) + .payload(payload) + .build(); } private MqttMessage createConnectPacket(MqttClientOptions options) { - MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, - false, - MqttQoS.AT_MOST_ONCE, - false, - 0); - - MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader( - PROTOCOL_NAME, - PROTOCOL_VERSION, - options.hasUsername(), - options.hasPassword(), - options.isWillRetain(), - options.getWillQoS(), - options.isWillFlag(), - options.isCleanSession(), - options.getKeepAliveTimeSeconds() - ); - - MqttConnectPayload payload = new MqttConnectPayload( - options.getClientId() == null ? "" : options.getClientId(), - options.getWillTopic(), - options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null, - options.hasUsername() ? options.getUsername() : null, - options.hasPassword() ? options.getPassword().getBytes(StandardCharsets.UTF_8) : null - ); - - return MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); + return MqttMessageBuilders.connect() + .hasUser(options.hasUsername()) + .hasPassword(options.hasPassword()) + .willRetain(options.isWillRetain()) + .willQoS(MqttQoS.valueOf(options.getWillQoS())) + .willFlag(options.isWillFlag()) + .willTopic(options.getWillTopic()) + .willMessage(options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null) + .cleanSession(options.isCleanSession()) + .keepAlive(options.getKeepAliveTimeSeconds()) + .clientId(options.getClientId() == null ? "" : options.getClientId()) + .username(options.hasUsername() ? options.getUsername() : null) + .password(options.hasPassword() ? options.getPassword().getBytes() : null) + .build(); } }