Skip to content

Commit

Permalink
Use Netty's MqttMessageBuilders when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
François Travais committed Oct 7, 2020
1 parent f3d109b commit 779017d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 116 deletions.
94 changes: 28 additions & 66 deletions src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,19 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
Expand Down Expand Up @@ -63,13 +62,11 @@
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.netty.handler.codec.mqtt.MqttQoS.*;

Expand All @@ -86,8 +83,6 @@ public class MqttClientImpl implements MqttClient {
private static final int MAX_MESSAGE_ID = 65535;
private static final int MAX_TOPIC_LEN = 65535;
private static final int MIN_TOPIC_LEN = 1;
private static final String PROTOCOL_NAME = "MQTT";
private static final int PROTOCOL_VERSION = 4;
private static final int DEFAULT_IDLE_TIMEOUT = 0;

private final VertxInternal vertx;
Expand Down Expand Up @@ -246,33 +241,20 @@ private Future<MqttConnAckMessage> doConnect(int port, String host, String serve
// an exception at connection level
soi.exceptionHandler(this::handleException);

MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT,
false,
AT_MOST_ONCE,
false,
0);

MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
PROTOCOL_NAME,
PROTOCOL_VERSION,
options.hasUsername(),
options.hasPassword(),
options.isWillRetain(),
options.getWillQoS(),
options.isWillFlag(),
options.isCleanSession(),
options.getKeepAliveTimeSeconds()
);

MqttConnectPayload payload = new MqttConnectPayload(
options.getClientId() == null ? "" : options.getClientId(),
options.getWillTopic(),
options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null,
options.hasUsername() ? options.getUsername() : null,
options.hasPassword() ? options.getPassword().getBytes() : null
);

io.netty.handler.codec.mqtt.MqttMessage connect = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
final MqttConnectMessage connect = MqttMessageBuilders.connect()
.hasUser(options.hasUsername())
.hasPassword(options.hasPassword())
.willRetain(options.isWillRetain())
.willQoS(MqttQoS.valueOf(options.getWillQoS()))
.willFlag(options.isWillFlag())
.willTopic(options.getWillTopic())
.willMessage(options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null)
.cleanSession(options.isCleanSession())
.keepAlive(options.getKeepAliveTimeSeconds())
.clientId(options.getClientId() == null ? "" : options.getClientId())
.username(options.hasUsername() ? options.getUsername() : null)
.password(options.hasPassword() ? options.getPassword().getBytes() : null)
.build();

this.write(connect);
}
Expand Down Expand Up @@ -491,24 +473,12 @@ public Future<Integer> subscribe(Map<String, Integer> topics) {
return ctx.failedFuture(exception);
}

MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.SUBSCRIBE,
false,
AT_LEAST_ONCE,
false,
0);

MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId());
List<MqttTopicSubscription> subscriptions = topics.entrySet()
.stream()
.map(e -> new MqttTopicSubscription(e.getKey(), valueOf(e.getValue())))
.collect(Collectors.toList());

MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions);

io.netty.handler.codec.mqtt.MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
final MqttMessageBuilders.SubscribeBuilder subscribeBuilder = MqttMessageBuilders.subscribe()
.messageId(nextMessageId());
topics.forEach((topic, qos) -> subscribeBuilder.addSubscription(MqttQoS.valueOf(qos), topic));
final MqttSubscribeMessage subscribe = subscribeBuilder.build();

return this.write(subscribe).map(variableHeader.messageId());
return this.write(subscribe).map(subscribe.variableHeader().messageId());
}

/**
Expand Down Expand Up @@ -558,22 +528,14 @@ public MqttClient unsubscribe(String topic, Handler<AsyncResult<Integer>> unsubs
@Override
public Future<Integer> unsubscribe(String topic) {

MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.UNSUBSCRIBE,
false,
AT_LEAST_ONCE,
false,
0);

MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(nextMessageId());

MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList()));

io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
final MqttUnsubscribeMessage unsubscribe = MqttMessageBuilders.unsubscribe()
.addTopicFilter(topic)
.messageId(nextMessageId())
.build();

this.write(unsubscribe);

return ctx.succeededFuture(variableHeader.messageId());
return ctx.succeededFuture(unsubscribe.variableHeader().messageId());
}

/**
Expand Down
14 changes: 6 additions & 8 deletions src/main/java/io/vertx/mqtt/impl/MqttEndpointImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
Expand All @@ -30,7 +31,6 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.core.impl.logging.Logger;
Expand Down Expand Up @@ -298,12 +298,10 @@ public MqttEndpointImpl exceptionHandler(Handler<Throwable> handler) {

private MqttEndpointImpl connack(MqttConnectReturnCode returnCode, boolean sessionPresent) {

MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttConnAckVariableHeader variableHeader =
new MqttConnAckVariableHeader(returnCode, sessionPresent);

io.netty.handler.codec.mqtt.MqttMessage connack = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
MqttConnAckMessage connack = MqttMessageBuilders.connAck()
.returnCode(returnCode)
.sessionPresent(sessionPresent)
.build();

this.write(connack);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,10 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.CharsetUtil;
import io.vertx.core.buffer.Buffer;
Expand All @@ -57,8 +52,6 @@
@RunWith(VertxUnitRunner.class)
public class MqttServerBadClientTest extends MqttServerBaseTest {

private static final String PROTOCOL_NAME = "MQTT";
private static final int PROTOCOL_VERSION = 4;
private static final String MQTT_TOPIC = "/my_topic";
private static final String MQTT_MESSAGE = "I'm a bad client";

Expand Down Expand Up @@ -182,45 +175,32 @@ public void unknownMessageType(TestContext context) {
private final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false);

private MqttPublishMessage createPublishMessage() {

MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, true, 0);

MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(MQTT_TOPIC, 1);

ByteBuf payload = ALLOCATOR.buffer();
payload.writeBytes(MQTT_MESSAGE.getBytes(CharsetUtil.UTF_8));

return new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, payload);
return return MqttMessageBuilders.publish()
.qos(MqttQoS.AT_LEAST_ONCE)
.retained(true)
.topicName(MQTT_TOPIC)
.messageId(1)
.payload(payload)
.build();
}

private MqttMessage createConnectPacket(MqttClientOptions options) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT,
false,
MqttQoS.AT_MOST_ONCE,
false,
0);

MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
PROTOCOL_NAME,
PROTOCOL_VERSION,
options.hasUsername(),
options.hasPassword(),
options.isWillRetain(),
options.getWillQoS(),
options.isWillFlag(),
options.isCleanSession(),
options.getKeepAliveTimeSeconds()
);

MqttConnectPayload payload = new MqttConnectPayload(
options.getClientId() == null ? "" : options.getClientId(),
options.getWillTopic(),
options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null,
options.hasUsername() ? options.getUsername() : null,
options.hasPassword() ? options.getPassword().getBytes(StandardCharsets.UTF_8) : null
);

return MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
return MqttMessageBuilders.connect()
.hasUser(options.hasUsername())
.hasPassword(options.hasPassword())
.willRetain(options.isWillRetain())
.willQoS(MqttQoS.valueOf(options.getWillQoS()))
.willFlag(options.isWillFlag())
.willTopic(options.getWillTopic())
.willMessage(options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null)
.cleanSession(options.isCleanSession())
.keepAlive(options.getKeepAliveTimeSeconds())
.clientId(options.getClientId() == null ? "" : options.getClientId())
.username(options.hasUsername() ? options.getUsername() : null)
.password(options.hasPassword() ? options.getPassword().getBytes() : null)
.build();
}
}

0 comments on commit 779017d

Please sign in to comment.