> unsubscribeSentHandler);
+
/**
* Sets handler which will be called after PINGRESP packet receiving
*
diff --git a/src/main/java/io/vertx/mqtt/MqttClientSession.java b/src/main/java/io/vertx/mqtt/MqttClientSession.java
new file mode 100644
index 00000000..68c5a680
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/MqttClientSession.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.codegen.annotations.Fluent;
+import io.vertx.codegen.annotations.GenIgnore;
+import io.vertx.codegen.annotations.VertxGen;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.mqtt.impl.MqttClientSessionImpl;
+import io.vertx.mqtt.messages.MqttPublishMessage;
+import io.vertx.mqtt.session.RequestedQoS;
+import io.vertx.mqtt.session.SessionEvent;
+import io.vertx.mqtt.session.SessionState;
+import io.vertx.mqtt.session.SubscriptionEvent;
+
+/**
+ * An MQTT client session.
+ */
+@VertxGen
+public interface MqttClientSession {
+
+ /**
+ * Create a new MQTT client session.
+ *
+ * The session will initially be disconnected, and must be started using {@link #start()}.
+ *
+ * @param vertx Vert.x instance
+ * @param options MQTT client session options
+ * @return MQTT client session instance
+ */
+ static MqttClientSession create(Vertx vertx, MqttClientSessionOptions options) {
+ return new MqttClientSessionImpl(vertx, options);
+ }
+
+ /**
+ * Set the session state handler.
+ *
+ * @param sessionStateHandler The new handler, will overwrite the old one.
+ * @return current MQTT client session instance
+ */
+ @Fluent
+ MqttClientSession sessionStateHandler(Handler sessionStateHandler);
+
+ /**
+ * Set the subscription state handler.
+ *
+ * @param subscriptionStateHandler The new handler, will overwrite the old one.
+ * @return current MQTT client session instance
+ */
+ @Fluent
+ MqttClientSession subscriptionStateHandler(Handler subscriptionStateHandler);
+
+ /**
+ * Set the publish complete handler.
+ *
+ * @param publishCompleteHandler The new handler, will overwrite the old one.
+ * @return current MQTT client session instance
+ * @see MqttClient#publishCompletionHandler(Handler)
+ */
+ @Fluent
+ MqttClientSession publishCompletionHandler(Handler publishCompleteHandler);
+
+ /**
+ * Set the publish completion expiration handler.
+ *
+ * @param publishCompletionExpirationHandler The new handler, will overwrite the old one.
+ * @return current MQTT client session instance
+ * @see MqttClient#publishCompletionExpirationHandler(Handler)
+ */
+ @Fluent
+ MqttClientSession publishCompletionExpirationHandler(Handler publishCompletionExpirationHandler);
+
+ /**
+ * Set the publish completion unknown packet id handler.
+ *
+ * @param publishCompletionUnknownPacketIdHandler The new handler, will overwrite the old one.
+ * @return current MQTT client session instance
+ * @see MqttClient#publishCompletionUnknownPacketIdHandler(Handler)
+ */
+ @Fluent
+ MqttClientSession publishCompletionUnknownPacketIdHandler(Handler publishCompletionUnknownPacketIdHandler);
+
+ /**
+ * Start the session. This will try to drive the connection to {@link SessionState#CONNECTED}.
+ */
+ void start();
+
+ /**
+ * Stop the session. This will try to drive the connection to {@link SessionState#DISCONNECTED}.
+ */
+ void stop();
+
+ /**
+ * Subscribes to the topics with related QoS levels
+ *
+ * @param topics topics and related QoS levels to subscribe to
+ * @return current MQTT client session instance
+ */
+ @Fluent
+ MqttClientSession subscribe(Map topics);
+
+ /**
+ * Subscribes to a single topic with related QoS level.
+ *
+ * @param topic The topic to subscribe to.
+ * @param qos The QoS to request from the server.
+ * @return current MQTT client session instance
+ */
+ @Fluent
+ default MqttClientSession subscribe(String topic, RequestedQoS qos) {
+ return subscribe(Collections.singletonMap(topic, qos));
+ }
+
+ /**
+ * Subscribes to a list of topics, with the same QoS.
+ *
+ * @param qos The QoS to use.
+ * @param topics The topics to subscribe to.
+ * @return current MQTT client session instance
+ */
+ @Fluent
+ @GenIgnore
+ default MqttClientSession subscribe(RequestedQoS qos, String... topics) {
+ final Map topicMap = new LinkedHashMap<>(topics.length);
+ for (String topic : topics) {
+ topicMap.put(topic, qos);
+ }
+ return subscribe(topicMap);
+ }
+
+ /**
+ * Unsubscribe from receiving messages on given topics
+ *
+ * @param topics Topics you want to unsubscribe from
+ * @return current MQTT client session instance
+ */
+ MqttClientSession unsubscribe(List topics);
+
+ /**
+ * Unsubscribe from receiving messages on given topics
+ *
+ * @param topics Topics you want to unsubscribe from
+ * @return current MQTT client session instance
+ */
+ @GenIgnore
+ default MqttClientSession unsubscribe(String... topics) {
+ return unsubscribe(Arrays.asList(topics));
+ }
+
+ /**
+ * Sets handler which will be called each time server publish something to client
+ *
+ * @param messageHandler handler to call
+ * @return current MQTT client session instance
+ */
+ @Fluent
+ MqttClientSession messageHandler(Handler messageHandler);
+
+ /**
+ * Sends the PUBLISH message to the remote MQTT server
+ *
+ * @param topic topic on which the message is published
+ * @param payload message payload
+ * @param qosLevel QoS level
+ * @param isDup if the message is a duplicate
+ * @param isRetain if the message needs to be retained
+ * @return a {@code Future} completed after PUBLISH packet sent with packetid (not when QoS 0)
+ */
+ Future publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain);
+
+ /**
+ * Sends the PUBLISH message to the remote MQTT server
+ *
+ * @param topic topic on which the message is published
+ * @param payload message payload
+ * @param qosLevel QoS level
+ * @return a {@code Future} completed after PUBLISH packet sent with packetid (not when QoS 0)
+ */
+ default Future publish(String topic, Buffer payload, MqttQoS qosLevel) {
+ return publish(topic, payload, qosLevel, false, false);
+ }
+}
diff --git a/src/main/java/io/vertx/mqtt/MqttClientSessionOptions.java b/src/main/java/io/vertx/mqtt/MqttClientSessionOptions.java
new file mode 100644
index 00000000..ef678d1b
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/MqttClientSessionOptions.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt;
+
+import io.vertx.codegen.annotations.DataObject;
+import io.vertx.core.json.JsonObject;
+import io.vertx.mqtt.session.ConstantReconnectDelayOptions;
+import io.vertx.mqtt.session.ReconnectDelayOptions;
+
+@DataObject(generateConverter = true)
+public class MqttClientSessionOptions extends MqttClientOptions {
+
+ private static final ReconnectDelayOptions DEFAULT_RECONNECT_DELAY = new ConstantReconnectDelayOptions();
+
+ private String hostname = MqttClientOptions.DEFAULT_HOST;
+ private int port = MqttClientOptions.DEFAULT_PORT;
+ private ReconnectDelayOptions reconnectDelay = DEFAULT_RECONNECT_DELAY;
+
+ /**
+ * Default constructor
+ */
+ public MqttClientSessionOptions() {
+ super();
+ }
+
+ /**
+ * Create an instance of MqttClientSessionOptions from JSON
+ *
+ * @param json the JSON
+ */
+ public MqttClientSessionOptions(JsonObject json) {
+ super(json);
+ MqttClientSessionOptionsConverter.fromJson(json, this);
+ }
+
+ /**
+ * Copy constructor
+ *
+ * @param other the options to copy
+ */
+ public MqttClientSessionOptions(MqttClientSessionOptions other) {
+ super(other);
+ this.hostname = other.hostname;
+ this.port = other.port;
+ this.reconnectDelay= other.reconnectDelay.copy();
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ public MqttClientSessionOptions setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public String getHostname() {
+ return this.hostname;
+ }
+
+ public MqttClientSessionOptions setHostname(String hostname) {
+ this.hostname = hostname;
+ return this;
+ }
+
+ public MqttClientSessionOptions setReconnectDelay(ReconnectDelayOptions reconnectDelay) {
+ this.reconnectDelay = reconnectDelay;
+ return this;
+ }
+
+ public ReconnectDelayOptions getReconnectDelay() {
+ return this.reconnectDelay;
+ }
+}
diff --git a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
index 8f92673a..47eceb9f 100644
--- a/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
+++ b/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java
@@ -615,8 +615,16 @@ private synchronized Handler unsubscribeCompletionHandler() {
*/
@Override
public MqttClient unsubscribe(String topic, Handler> unsubscribeSentHandler) {
+ return unsubscribe(Collections.singletonList(topic), unsubscribeSentHandler);
+ }
+
+ /**
+ * See {@link MqttClient#unsubscribe(List, Handler)} )} for more details
+ */
+ @Override
+ public MqttClient unsubscribe(List topics, Handler> unsubscribeSentHandler) {
- Future fut = unsubscribe(topic);
+ Future fut = unsubscribe(topics);
if (unsubscribeSentHandler != null) {
fut.onComplete(unsubscribeSentHandler);
}
@@ -629,6 +637,15 @@ public MqttClient unsubscribe(String topic, Handler> unsubs
@Override
public Future unsubscribe(String topic) {
+ return unsubscribe(Collections.singletonList(topic));
+ }
+
+ /**
+ * See {@link MqttClient#unsubscribe(List)} for more details
+ */
+ @Override
+ public Future unsubscribe(List topics) {
+
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.UNSUBSCRIBE,
false,
@@ -638,7 +655,7 @@ public Future unsubscribe(String topic) {
MqttMessageIdVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(nextMessageId(), MqttProperties.NO_PROPERTIES);
- MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList()));
+ MqttUnsubscribePayload payload = new MqttUnsubscribePayload(topics);
io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
diff --git a/src/main/java/io/vertx/mqtt/impl/MqttClientSessionImpl.java b/src/main/java/io/vertx/mqtt/impl/MqttClientSessionImpl.java
new file mode 100644
index 00000000..61876bef
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/impl/MqttClientSessionImpl.java
@@ -0,0 +1,642 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.impl;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.VertxException;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.impl.VertxInternal;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
+import io.vertx.mqtt.MqttClient;
+import io.vertx.mqtt.MqttClientSession;
+import io.vertx.mqtt.MqttClientSessionOptions;
+import io.vertx.mqtt.session.RequestedQoS;
+import io.vertx.mqtt.session.SessionEvent;
+import io.vertx.mqtt.session.SessionState;
+import io.vertx.mqtt.messages.MqttConnAckMessage;
+import io.vertx.mqtt.messages.MqttPublishMessage;
+import io.vertx.mqtt.messages.MqttSubAckMessage;
+import io.vertx.mqtt.session.ReconnectDelayProvider;
+import io.vertx.mqtt.session.SubscriptionEvent;
+import io.vertx.mqtt.session.impl.SessionEventImpl;
+import io.vertx.mqtt.session.impl.SubscriptionEventImpl;
+import io.vertx.mqtt.session.SubscriptionState;
+
+public class MqttClientSessionImpl implements MqttClientSession {
+
+ private static final Logger log = LoggerFactory.getLogger(MqttClientSessionImpl.class);
+
+ private final VertxInternal vertx;
+ private final MqttClientSessionOptions options;
+
+ // record the subscriptions
+ private final Map subscriptions = new HashMap<>();
+ // record the pending subscribes
+ private final Map> pendingSubscribes = new HashMap<>();
+ // record the pending unsubscribes
+ private final Map> pendingUnsubscribes = new HashMap<>();
+ // the provider for the reconnect delay
+ private final ReconnectDelayProvider reconnectDelay;
+
+ // the current state
+ private SessionState state = SessionState.DISCONNECTED;
+ // drives to connection either to CONNECTED or DISCONNECTED
+ private boolean running;
+
+ // holds the actual MQTT client connection
+ private MqttClient client;
+ // an optional reconnect timer
+ private Long reconnectTimer;
+
+ private volatile Handler messageHandler;
+ private volatile Handler sessionStateHandler;
+ private volatile Handler subscriptionStateHandler;
+ private volatile Handler publishCompleteHandler;
+ private volatile Handler publishCompletionExpirationHandler;
+ private volatile Handler publishCompletionUnknownPacketIdHandler;
+
+ /**
+ * Create a new instance, which is not started.
+ *
+ * @param vertx The vert.x instance to use.
+ * @param options The client session options.
+ */
+ public MqttClientSessionImpl(final Vertx vertx, final MqttClientSessionOptions options) {
+ this.vertx = (VertxInternal) vertx;
+ this.options = options;
+ this.reconnectDelay = options.getReconnectDelay().createProvider();
+ }
+
+ @Override
+ public void start() {
+ this.vertx.runOnContext(x -> doStart());
+ }
+
+ @Override
+ public void stop() {
+ this.vertx.runOnContext(x -> doStop());
+ }
+
+ @Override
+ public MqttClientSession subscribe(Map topics) {
+ final Map finalTopics = new LinkedHashMap<>(topics);
+ this.vertx.runOnContext(x -> doSubscribe(finalTopics));
+ return this;
+ }
+
+ @Override
+ public MqttClientSession unsubscribe(List topics) {
+ final Set finalTopics = new HashSet<>(topics);
+ this.vertx.runOnContext(x -> doUnsubscribe(finalTopics));
+ return this;
+ }
+
+ private void doStart() {
+ if (this.running) {
+ // nothing to do
+ return;
+ }
+
+ // we connect, not re-connect
+ this.reconnectDelay.reset();
+
+ this.running = true;
+ switch (this.state) {
+ case DISCONNECTED:
+ // initiate connection
+ createConnection();
+ break;
+ case CONNECTING:
+ // nothing to do
+ break;
+ case CONNECTED:
+ // nothing to do
+ break;
+ case DISCONNECTING:
+ // we do nothing here and wait until the disconnection advanced, which will then trigger a re-connect
+ break;
+ }
+ }
+
+ private void doStop() {
+ if (!this.running) {
+ // nothing to do
+ return;
+ }
+
+ this.running = false;
+
+ if (this.reconnectTimer != null) {
+ // we have a re-connect scheduled, but stop right now.
+ this.vertx.cancelTimer(this.reconnectTimer);
+ }
+
+ switch (this.state) {
+ case CONNECTED:
+ closeConnection((Throwable) null);
+ break;
+ case DISCONNECTED:
+ // nothing to do
+ break;
+ case DISCONNECTING:
+ // nothing do do
+ break;
+ case CONNECTING:
+ // we do nothing here and wait, until the connection advanced, which will then trigger a disconnect
+ break;
+ }
+ }
+
+ @Override
+ public MqttClientSession sessionStateHandler(Handler sessionStateHandler) {
+ this.sessionStateHandler = sessionStateHandler;
+ return this;
+ }
+
+ @Override
+ public MqttClientSession subscriptionStateHandler(Handler subscriptionStateHandler) {
+ this.subscriptionStateHandler = subscriptionStateHandler;
+ return this;
+ }
+
+ @Override
+ public MqttClientSession publishCompletionHandler(Handler publishCompleteHandler) {
+ this.publishCompleteHandler = publishCompleteHandler;
+ return this;
+ }
+
+ @Override
+ public MqttClientSession publishCompletionExpirationHandler(Handler publishCompletionExpirationHandler) {
+ this.publishCompletionExpirationHandler = publishCompletionExpirationHandler;
+ return this;
+ }
+
+ @Override
+ public MqttClientSession publishCompletionUnknownPacketIdHandler(Handler publishCompletionUnknownPacketIdHandler) {
+ this.publishCompletionUnknownPacketIdHandler = publishCompletionUnknownPacketIdHandler;
+ return this;
+ }
+
+ @Override
+ public MqttClientSession messageHandler(Handler messageHandler) {
+ this.messageHandler = messageHandler;
+ return this;
+ }
+
+ /**
+ * Set the state of the session.
+ *
+ * @param sessionState The new state.
+ * @param cause The optional cause, in case of an error.
+ */
+ private void setState(final SessionState sessionState, final Throwable cause) {
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("setState - current: %s, next: %s", this.state, sessionState), cause);
+ }
+
+ // before announcing our state change
+
+ switch (sessionState) {
+ case CONNECTING:
+ break;
+ case CONNECTED:
+ // successful connection, reset delay
+ this.reconnectDelay.reset();
+ break;
+ case DISCONNECTING:
+ break;
+ case DISCONNECTED:
+ this.pendingUnsubscribes.clear();
+ this.pendingSubscribes.clear();
+ for (String topic : this.subscriptions.keySet()) {
+ notifySubscriptionState(topic, SubscriptionState.UNSUBSCRIBED, null);
+ }
+ break;
+ }
+
+ // announce state change
+
+ if (this.state != sessionState) {
+ this.state = sessionState;
+ Handler handler = this.sessionStateHandler;
+ if (handler != null) {
+ handler.handle(new SessionEventImpl(sessionState, cause));
+ }
+ }
+
+ // after announcing out state change
+
+ switch (this.state) {
+ case CONNECTING:
+ // we just wait for the outcome
+ break;
+ case CONNECTED:
+ if (!this.running) {
+ closeConnection((Throwable) null);
+ }
+ break;
+ case DISCONNECTING:
+ // we just wait for the outcome
+ break;
+ case DISCONNECTED:
+ if (this.running) {
+ scheduleReconnect();
+ }
+ break;
+ }
+ }
+
+ private void notifySubscriptionState(final String topic, final SubscriptionState state, final Integer grantedQoS) {
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("setSubscriptionState - topic: %s, state: %s, grantedQoS: %s", topic, state, grantedQoS));
+ }
+
+ Handler handler = this.subscriptionStateHandler;
+ if (handler != null) {
+ handler.handle(new SubscriptionEventImpl(topic, state, grantedQoS));
+ }
+
+ }
+
+ private void scheduleReconnect() {
+ log.debug("Scheduling reconnect");
+
+ if (this.reconnectTimer == null) {
+
+ final Duration delay = nextDelay();
+ if (log.isDebugEnabled()) {
+ log.debug("Next delay: " + delay);
+ }
+
+ final long timer = vertx.setTimer(delay.toMillis(), x -> createConnection());
+ if (log.isDebugEnabled()) {
+ log.debug("Timer set: " + timer);
+ }
+
+ this.reconnectTimer = timer;
+ }
+ }
+
+ /**
+ * Calculate the next delay before trying to re-connect.
+ *
+ * @return The duration to wait.
+ */
+ private Duration nextDelay() {
+ return this.reconnectDelay.nextDelay();
+ }
+
+ /**
+ * Initiates the connection.
+ */
+ private void createConnection() {
+ log.debug("Creating connection");
+
+ // clear reconnect timer
+ this.reconnectTimer = null;
+
+ // create client
+ this.client = MqttClient.create(this.vertx, this.options);
+ this.client.exceptionHandler(this::closeConnection);
+ this.client.closeHandler(x -> connectionClosed());
+ this.client.publishHandler(this::serverPublished);
+ this.client.subscribeCompletionHandler(this::subscribeCompleted);
+ this.client.unsubscribeCompletionHandler(this::unsubscribeCompleted);
+ this.client.publishCompletionHandler(this::publishComplete);
+ this.client.publishCompletionExpirationHandler(this::publishExpired);
+ this.client.publishCompletionUnknownPacketIdHandler(this::publishCompletionUnknown);
+
+ // change state
+ setState(SessionState.CONNECTING, null);
+ // start connection
+ this.client.connect(this.options.getPort(), this.options.getHostname()).onComplete(this::connectCompleted);
+ }
+
+ /**
+ * Initiates the connection shutdown.
+ */
+ private void closeConnection(Throwable cause) {
+ log.debug("Closing connection", cause);
+
+ setState(SessionState.DISCONNECTING, cause);
+ this.client.disconnect().onComplete(this::disconnectCompleted);
+ }
+
+ /**
+ * Gets called when the connect call was processed.
+ *
+ * @param result The outcome of the connect call.
+ */
+ private void connectCompleted(AsyncResult result) {
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Connect completed - result: %s, cause: %s", result.result(), result.cause()));
+ }
+
+ if (result.failed() || result.result() == null) {
+ // this will include CONACKs with error codes
+ setState(SessionState.DISCONNECTED, result.cause());
+ return;
+ }
+
+ MqttConnAckMessage ack = result.result();
+
+ setState(SessionState.CONNECTED, null);
+
+ if (!this.subscriptions.isEmpty() && (options.isCleanSession() || !ack.isSessionPresent())) {
+ // re-subscribe if we have requested subscriptions and (either cleanSession=true or no session found on the server)
+ requestSubscribe(new LinkedHashMap<>(this.subscriptions));
+ }
+ }
+
+ /**
+ * Gets called when the disconnect call was processed.
+ *
+ * @param result The outcome of the disconnect call.
+ */
+ private void disconnectCompleted(AsyncResult> result) {
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Disconnect completed - result: %s, cause: %s", result.result(), result.cause()));
+ }
+
+ connectionClosed(result.cause());
+ }
+
+ /**
+ * Gets called internally when the only reasonable action is to just disconnect.
+ *
+ * If the session is still running, then it will trigger a re-connect.
+ *
+ * @param reason The reason message.
+ */
+ private void closeConnection(final String reason) {
+ closeConnection(new VertxException(reason).fillInStackTrace());
+ }
+
+ /**
+ * Gets called when the connection just dropped.
+ */
+ private void connectionClosed() {
+ if (this.state != SessionState.DISCONNECTING) {
+ // this came unexpected
+ connectionClosed(new VertxException("Connection closed"));
+ }
+ }
+
+ /**
+ * Called to clean up the after a connection was closed.
+ *
+ * @param cause The cause of the connection closure.
+ */
+ private void connectionClosed(final Throwable cause) {
+ log.info("Connection closed", cause);
+
+ if (this.client != null) {
+ this.client.exceptionHandler(null);
+ this.client.publishHandler(null);
+ this.client.closeHandler(null);
+ this.client.subscribeCompletionHandler(null);
+ this.client.publishCompletionHandler(null);
+ this.client.publishCompletionExpirationHandler(null);
+ this.client.publishCompletionUnknownPacketIdHandler(null);
+ this.client = null;
+ }
+ setState(SessionState.DISCONNECTED, cause);
+ }
+
+ /**
+ * Gets called when the server published a message for us.
+ *
+ * @param message The published message.
+ */
+ private void serverPublished(MqttPublishMessage message) {
+ if (log.isDebugEnabled()) {
+ log.debug("Server published: " + message);
+ }
+
+ Handler publishHandler = this.messageHandler;
+ if (publishHandler != null) {
+ publishHandler.handle(message);
+ }
+ }
+
+ /**
+ * Perform subscribing.
+ *
+ * @param topics The topics to subscribe to.
+ */
+ private void doSubscribe(Map topics) {
+ final LinkedHashMap subscriptions = new LinkedHashMap<>(topics.size());
+
+ for (Map.Entry entry : topics.entrySet()) {
+ this.subscriptions.compute(entry.getKey(), (key, current) -> {
+ if (current != entry.getValue()) {
+ subscriptions.put(entry.getKey(), entry.getValue());
+ }
+ return entry.getValue();
+ });
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Requesting subscribe: " + subscriptions);
+ }
+ requestSubscribe(subscriptions);
+ }
+
+ /**
+ * Perform unsubscribing.
+ *
+ * @param topics The topics to unsubscribe from.
+ */
+ private void doUnsubscribe(Set topics) {
+ final List topicsToSend = new ArrayList<>(topics.size());
+ for (String topic : topics) {
+ if (this.subscriptions.remove(topic) != null) {
+ topicsToSend.add(topic);
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Requesting unsubscribe: " + topicsToSend);
+ }
+ requestUnsubscribe(topicsToSend);
+ }
+
+ /**
+ * Request to subscribe from the server.
+ *
+ * @param topics The topics to subscribe to, including the requested QoS.
+ */
+ private void requestSubscribe(LinkedHashMap topics) {
+ if (topics.isEmpty() || this.client == null) {
+ // nothing to do
+ return;
+ }
+
+ this.client
+ .subscribe(topics.entrySet()
+ .stream().collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> e.getValue().toInteger()
+ )))
+ .onComplete(result -> subscribeSent(result, topics));
+ }
+
+ /**
+ * Request to unsubscribe from the server.
+ *
+ * @param topics The topic to unsubscribe from.
+ */
+ private void requestUnsubscribe(List topics) {
+ if (topics.isEmpty() || this.client == null) {
+ // nothing to do
+ return;
+ }
+
+ this.client
+ .unsubscribe(topics)
+ .onComplete(result -> unsubscribeSent(result, topics));
+ }
+
+ /**
+ * Called when the subscribe call was sent.
+ *
+ * @param result The result of sending the request, contains the packet id.
+ */
+ private void subscribeSent(AsyncResult result, LinkedHashMap topics) {
+ if (result.failed() || result.result() == null) {
+ // failed
+ for (String topic : topics.keySet()) {
+ notifySubscriptionState(topic, SubscriptionState.UNSUBSCRIBED, null);
+ }
+ } else {
+ // record request
+ for (String topic : topics.keySet()) {
+ notifySubscriptionState(topic, SubscriptionState.SUBSCRIBING, null);
+ }
+ this.pendingSubscribes.put(result.result(), topics);
+ }
+ }
+
+ /**
+ * Called when the unsubscribe call was sent.
+ *
+ * @param result The result of sending the request, contains the packet id.
+ */
+ private void unsubscribeSent(AsyncResult result, List topics) {
+ if (result.failed() || result.result() == null) {
+ closeConnection(String.format("Failed to send unsubscribe request: %s", result.cause()));
+ } else {
+ this.pendingUnsubscribes.put(result.result(), topics);
+ }
+ }
+
+ /**
+ * Called when the server processed the request to subscribe.
+ *
+ * @param ack The acknowledge message.
+ */
+ private void subscribeCompleted(MqttSubAckMessage ack) {
+ LinkedHashMap request = this.pendingSubscribes.remove(ack.messageId());
+ if (request == null) {
+ closeConnection(String.format("Unexpected subscription ack response - messageId: %s", ack.messageId()));
+ return;
+ }
+ if (request.size() != ack.grantedQoSLevels().size()) {
+ closeConnection(String.format("Mismatch of topics on subscription ack - expected: %d, actual: %d", request.size(), ack.grantedQoSLevels().size()));
+ return;
+ }
+
+ int idx = 0;
+ for (String topic : request.keySet()) {
+ Integer grantedQoS = ack.grantedQoSLevels().get(idx);
+ notifySubscriptionState(topic, SubscriptionState.SUBSCRIBED, grantedQoS);
+ idx += 1;
+ }
+ }
+
+ /**
+ * Called when the server processed the request to unsubscribe.
+ *
+ * @param messageId The ID of the message that completed.
+ */
+ private void unsubscribeCompleted(Integer messageId) {
+ List request = this.pendingUnsubscribes.remove(messageId);
+ for (String topic : request) {
+ notifySubscriptionState(topic, SubscriptionState.UNSUBSCRIBED, null);
+ }
+ }
+
+ @Override
+ public Future publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) {
+ Promise future = Promise.promise();
+ this.vertx
+ .runOnContext(x -> doPublish(topic, payload, qosLevel, isDup, isRetain)
+ .onComplete(future));
+ return future.future();
+ }
+
+ private Future doPublish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) {
+ if (this.client != null) {
+ return this.client.publish(topic, payload, qosLevel, isDup, isRetain);
+ } else {
+ return Future.failedFuture("Session is not connected");
+ }
+ }
+
+ private void publishComplete(Integer messageId) {
+ Handler handler = this.publishCompleteHandler;
+ if (handler != null) {
+ handler.handle(messageId);
+ }
+ }
+
+ private void publishExpired(Integer messageId) {
+ Handler handler = this.publishCompletionExpirationHandler;
+ if (handler != null) {
+ handler.handle(messageId);
+ }
+ }
+
+ private void publishCompletionUnknown(Integer messageId) {
+ Handler handler = this.publishCompletionUnknownPacketIdHandler;
+ if (handler != null) {
+ handler.handle(messageId);
+ }
+ }
+
+}
diff --git a/src/main/java/io/vertx/mqtt/session/ConstantReconnectDelayOptions.java b/src/main/java/io/vertx/mqtt/session/ConstantReconnectDelayOptions.java
new file mode 100644
index 00000000..7aeec4ec
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/session/ConstantReconnectDelayOptions.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.session;
+
+import java.time.Duration;
+
+import io.vertx.codegen.annotations.DataObject;
+import io.vertx.codegen.annotations.Fluent;
+import io.vertx.core.json.JsonObject;
+
+@DataObject(generateConverter = true)
+public class ConstantReconnectDelayOptions implements ReconnectDelayOptions {
+
+ private static final Duration DEFAULT_DELAY = Duration.ofSeconds(10);
+
+ private Duration delay = DEFAULT_DELAY;
+
+ public ConstantReconnectDelayOptions() {
+ }
+
+ /**
+ * Create an instance of ConstantReconnectDelayOptions from JSON
+ *
+ * @param json the JSON
+ */
+ public ConstantReconnectDelayOptions(JsonObject json) {
+ ConstantReconnectDelayOptionsConverter.fromJson(json, this);
+ }
+
+ @Fluent
+ public ConstantReconnectDelayOptions setDelay(Duration delay) {
+ this.delay = delay;
+ return this;
+ }
+
+ public Duration getDelay() {
+ return this.delay;
+ }
+
+ @Override
+ public ReconnectDelayProvider createProvider() {
+
+ final Duration delay = this.delay;
+
+ return new ReconnectDelayProvider() {
+
+ @Override
+ public Duration nextDelay() {
+ return delay;
+ }
+
+ @Override
+ public void reset() {
+ // no-op
+ }
+ };
+
+ }
+
+ @Override
+ public ReconnectDelayOptions copy() {
+ ConstantReconnectDelayOptions result = new ConstantReconnectDelayOptions();
+ result.delay = this.delay;
+ return result;
+ }
+}
diff --git a/src/main/java/io/vertx/mqtt/session/ExponentialBackoffDelayOptions.java b/src/main/java/io/vertx/mqtt/session/ExponentialBackoffDelayOptions.java
new file mode 100644
index 00000000..b3b74776
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/session/ExponentialBackoffDelayOptions.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.session;
+
+import java.time.Duration;
+
+import io.vertx.codegen.annotations.DataObject;
+import io.vertx.codegen.annotations.Fluent;
+import io.vertx.core.json.JsonObject;
+
+@DataObject(generateConverter = true)
+public class ExponentialBackoffDelayOptions implements ReconnectDelayOptions {
+
+ private static final Duration DEFAULT_MINIMUM = Duration.ofSeconds(1);
+ private static final Duration DEFAULT_INCREMENT = Duration.ofSeconds(1);
+ private static final Duration DEFAULT_MAXIMUM = Duration.ofMinutes(5);
+
+ private Duration minimum = DEFAULT_MINIMUM;
+ private Duration increment = DEFAULT_INCREMENT;
+ private Duration maximum = DEFAULT_MAXIMUM;
+
+ public ExponentialBackoffDelayOptions() {
+ }
+
+ /**
+ * Create an instance of ConstantReconnectDelayOptions from JSON
+ *
+ * @param json the JSON
+ */
+ public ExponentialBackoffDelayOptions(JsonObject json) {
+ ExponentialBackoffDelayOptionsConverter.fromJson(json, this);
+ }
+
+ @Fluent
+ public ExponentialBackoffDelayOptions setIncrement(Duration increment) {
+ this.increment = increment;
+ return this;
+ }
+
+ public Duration getIncrement() {
+ return this.increment;
+ }
+
+ @Fluent
+ public ExponentialBackoffDelayOptions setMaximum(Duration maximum) {
+ this.maximum = maximum;
+ return this;
+ }
+
+ public Duration getMaximum() {
+ return this.maximum;
+ }
+
+ @Fluent
+ public ExponentialBackoffDelayOptions setMinimum(Duration minimum) {
+ this.minimum = minimum;
+ return this;
+ }
+
+ public Duration getMinimum() {
+ return this.minimum;
+ }
+
+ private void validate() {
+ if (this.minimum.isNegative() ) {
+ throw new IllegalArgumentException("'minimum' must be a positive or zero duration");
+ }
+ if (this.increment.isNegative() || this.increment.isZero()) {
+ throw new IllegalArgumentException("'increment' must be a positive duration");
+ }
+ if (this.maximum.isNegative() || this.maximum.isZero()) {
+ throw new IllegalArgumentException("'maximum' must be a positive duration");
+ }
+ if (this.maximum.compareTo(this.minimum) < 0) {
+ throw new IllegalArgumentException("'minimum' must be less than (or equal) to the maximum");
+ }
+ }
+
+ @Override
+ public ReconnectDelayProvider createProvider() {
+ validate();
+
+ long num = this.maximum.minus(this.minimum).toMillis() / this.increment.toMillis();
+ long max = (long)(Math.log(num) / Math.log(2))+1;
+
+ return new Provider(this.minimum, this.increment, this.maximum, max);
+ }
+
+ @Override
+ public ReconnectDelayOptions copy() {
+ ExponentialBackoffDelayOptions result = new ExponentialBackoffDelayOptions();
+ result.minimum = this.minimum;
+ result.increment = this.increment;
+ result.maximum = this.maximum;
+ return result;
+ }
+
+ private static class Provider implements ReconnectDelayProvider {
+
+ private final Duration minimum;
+ private final Duration increment;
+ private final Duration maximum;
+ private final long max;
+
+ private long count;
+
+ Provider(Duration minimum, Duration increment, Duration maximum, long max) {
+ this.minimum = minimum;
+ this.increment = increment;
+ this.maximum = maximum;
+ this.max = max;
+ }
+
+ @Override
+ public Duration nextDelay() {
+
+ if (this.count <= this.max) {
+
+ Duration delay = this.minimum;
+ if (this.count > 0) {
+ delay = delay.plus(this.increment.multipliedBy((long)Math.pow(2, this.count-1)));
+ }
+
+ this.count += 1;
+
+ return delay;
+ } else {
+ return this.maximum;
+ }
+
+ }
+
+ @Override
+ public void reset() {
+ this.count = 0;
+ }
+ }
+}
diff --git a/src/main/java/io/vertx/mqtt/session/ReconnectDelayOptions.java b/src/main/java/io/vertx/mqtt/session/ReconnectDelayOptions.java
new file mode 100644
index 00000000..d7cb6674
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/session/ReconnectDelayOptions.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.session;
+
+public interface ReconnectDelayOptions {
+
+ ReconnectDelayProvider createProvider();
+
+ ReconnectDelayOptions copy();
+
+}
diff --git a/src/main/java/io/vertx/mqtt/session/ReconnectDelayProvider.java b/src/main/java/io/vertx/mqtt/session/ReconnectDelayProvider.java
new file mode 100644
index 00000000..f1463f99
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/session/ReconnectDelayProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.session;
+
+import java.time.Duration;
+
+public interface ReconnectDelayProvider {
+
+ Duration nextDelay();
+
+ void reset();
+
+}
diff --git a/src/main/java/io/vertx/mqtt/session/RequestedQoS.java b/src/main/java/io/vertx/mqtt/session/RequestedQoS.java
new file mode 100644
index 00000000..dbc65b1b
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/session/RequestedQoS.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.session;
+
+/**
+ * The requested QoS level.
+ *
+ * NOTE: This is missing QoS 2, as this mode is not properly supported by the session.
+ */
+public enum RequestedQoS {
+ QOS_0(0),
+ QOS_1(1);
+
+ private final int value;
+
+ RequestedQoS(int value) {
+ this.value = value;
+ }
+
+ public int toInteger() {
+ return this.value;
+ }
+}
diff --git a/src/main/java/io/vertx/mqtt/session/SessionEvent.java b/src/main/java/io/vertx/mqtt/session/SessionEvent.java
new file mode 100644
index 00000000..8aad56bb
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/session/SessionEvent.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.session;
+
+import io.vertx.codegen.annotations.VertxGen;
+
+@VertxGen
+public interface SessionEvent {
+ SessionState getSessionState();
+
+ Throwable getCause();
+}
diff --git a/src/main/java/io/vertx/mqtt/session/SessionState.java b/src/main/java/io/vertx/mqtt/session/SessionState.java
new file mode 100644
index 00000000..bc0acd99
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/session/SessionState.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.session;
+
+/**
+ * The state of the session.
+ */
+public enum SessionState {
+ /**
+ * The session is disconnected.
+ *
+ * A re-connect timer may be pending.
+ */
+ DISCONNECTED,
+ /**
+ * The session started to connect.
+ *
+ * This may include re-subscribing to any topics after the connect call was successful.
+ */
+ CONNECTING,
+ /**
+ * The session is connected.
+ */
+ CONNECTED,
+ /**
+ * The session is in the process of an orderly disconnect.
+ */
+ DISCONNECTING,
+}
diff --git a/src/main/java/io/vertx/mqtt/session/SubscriptionEvent.java b/src/main/java/io/vertx/mqtt/session/SubscriptionEvent.java
new file mode 100644
index 00000000..a6908429
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/session/SubscriptionEvent.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.session;
+
+import io.vertx.codegen.annotations.VertxGen;
+
+@VertxGen
+public interface SubscriptionEvent {
+ Integer getQos();
+
+ SubscriptionState getSubscriptionState();
+
+ String getTopic();
+}
diff --git a/src/main/java/io/vertx/mqtt/session/SubscriptionState.java b/src/main/java/io/vertx/mqtt/session/SubscriptionState.java
new file mode 100644
index 00000000..4d96a98c
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/session/SubscriptionState.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.session;
+
+/**
+ * The state of a subscription.
+ *
+ * Subscriptions established when a new topic gets added, or the connection was established. If the subscribe call
+ * returns an error for the subscription, the state will remain {@link #FAILED} and it will not try to re-subscribe
+ * while the connection is active.
+ *
+ * When the session (connection) disconnects, all subscriptions will automatically be reset to {@link #UNSUBSCRIBED}.
+ */
+public enum SubscriptionState {
+ /**
+ * The topic is not subscribed.
+ */
+ UNSUBSCRIBED,
+ /**
+ * The topic is in the process of subscribing.
+ */
+ SUBSCRIBING,
+ /**
+ * The topic is subscribed.
+ */
+ SUBSCRIBED,
+ /**
+ * The topic could not be subscribed.
+ */
+ FAILED,
+}
diff --git a/src/main/java/io/vertx/mqtt/session/impl/SessionEventImpl.java b/src/main/java/io/vertx/mqtt/session/impl/SessionEventImpl.java
new file mode 100644
index 00000000..9ceae00c
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/session/impl/SessionEventImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.session.impl;
+
+import io.vertx.mqtt.session.SessionEvent;
+import io.vertx.mqtt.session.SessionState;
+
+/**
+ * An event of a session state change.
+ */
+public class SessionEventImpl implements SessionEvent {
+
+ private final SessionState sessionState;
+ private final Throwable cause;
+
+ public SessionEventImpl(final SessionState sessionState, final Throwable reason) {
+ this.sessionState = sessionState;
+ this.cause = reason;
+ }
+
+ /**
+ * The new state of the session.
+ *
+ * @return The state.
+ */
+ @Override
+ public SessionState getSessionState() {
+ return this.sessionState;
+ }
+
+ /**
+ * The (optional) cause of change.
+ *
+ * @return The throwable that causes the state change, or {@code null}, if there was none.
+ */
+ @Override
+ public Throwable getCause() {
+ return this.cause;
+ }
+}
diff --git a/src/main/java/io/vertx/mqtt/session/impl/SubscriptionEventImpl.java b/src/main/java/io/vertx/mqtt/session/impl/SubscriptionEventImpl.java
new file mode 100644
index 00000000..f0bfa4df
--- /dev/null
+++ b/src/main/java/io/vertx/mqtt/session/impl/SubscriptionEventImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.session.impl;
+
+import java.util.Objects;
+import java.util.StringJoiner;
+
+import io.vertx.mqtt.session.SubscriptionEvent;
+import io.vertx.mqtt.session.SubscriptionState;
+
+/**
+ * An event of a subscription state change.
+ */
+public class SubscriptionEventImpl implements SubscriptionEvent {
+ private final String topic;
+ private final SubscriptionState subscriptionState;
+ private final Integer qos;
+
+ public SubscriptionEventImpl(final String topic, final SubscriptionState subscriptionState, final Integer qos) {
+ this.topic = topic;
+ this.subscriptionState = subscriptionState;
+ this.qos = qos;
+ }
+
+ /**
+ * The granted QoS level from the server.
+ *
+ * @return When the state changed to {@link SubscriptionState#SUBSCRIBED}, it contains the QoS level granted by
+ * the server. Otherwise it will be {@code null}.
+ */
+ @Override
+ public Integer getQos() {
+ return this.qos;
+ }
+
+ /**
+ * The new subscription state.
+ *
+ * @return The state.
+ */
+ @Override
+ public SubscriptionState getSubscriptionState() {
+ return this.subscriptionState;
+ }
+
+ /**
+ * The name of the topic this change refers to.
+ *
+ * @return The topic name.
+ */
+ @Override
+ public String getTopic() {
+ return this.topic;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ SubscriptionEventImpl that = (SubscriptionEventImpl) o;
+ return topic.equals(that.topic) && subscriptionState == that.subscriptionState && Objects.equals(qos, that.qos);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topic, subscriptionState, qos);
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", SubscriptionEventImpl.class.getSimpleName() + "[", "]")
+ .add("topic='" + topic + "'")
+ .add("subscriptionState=" + subscriptionState)
+ .add("qos=" + qos)
+ .toString();
+ }
+}
diff --git a/src/test/java/io/vertx/mqtt/session/ExponentialProviderTest.java b/src/test/java/io/vertx/mqtt/session/ExponentialProviderTest.java
new file mode 100644
index 00000000..aa14cb99
--- /dev/null
+++ b/src/test/java/io/vertx/mqtt/session/ExponentialProviderTest.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.session;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.time.Duration;
+
+import org.junit.Test;
+
+public class ExponentialProviderTest {
+
+ @Test
+ public void testDefaults() {
+ ExponentialBackoffDelayOptions opts = new ExponentialBackoffDelayOptions();
+ ReconnectDelayProvider provider = opts.createProvider();
+ assertNotNull(provider);
+ }
+
+ @Test(expected = Throwable.class)
+ public void testMinGreaterThanMax() {
+ ExponentialBackoffDelayOptions opts = new ExponentialBackoffDelayOptions();
+ opts.setMinimum(Duration.ofSeconds(10));
+ opts.setMaximum(Duration.ofSeconds(5));
+ opts.createProvider();
+ }
+
+ @Test(expected = Throwable.class)
+ public void testNegativeMin() {
+ ExponentialBackoffDelayOptions opts = new ExponentialBackoffDelayOptions();
+ opts.setMinimum(Duration.ofSeconds(-10));
+ opts.createProvider();
+ }
+
+ @Test(expected = Throwable.class)
+ public void testNegativeInc() {
+ ExponentialBackoffDelayOptions opts = new ExponentialBackoffDelayOptions();
+ opts.setIncrement(Duration.ofSeconds(-10));
+ opts.createProvider();
+ }
+
+ @Test(expected = Throwable.class)
+ public void testNegativeMax() {
+ ExponentialBackoffDelayOptions opts = new ExponentialBackoffDelayOptions();
+ opts.setIncrement(Duration.ofSeconds(-10));
+ opts.createProvider();
+ }
+
+ @Test
+ public void testMaximumDefault() {
+ ExponentialBackoffDelayOptions opts = new ExponentialBackoffDelayOptions();
+ ReconnectDelayProvider provider = opts.createProvider();
+ assertEquals(Duration.ofSeconds(1+0), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(1+1), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(1+2), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(1+4), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(1+8), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(1+16), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(1+32), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(1+64), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(1+128), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(1+256), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(300), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(300), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(300), provider.nextDelay());
+ }
+
+ @Test
+ public void testMaximumZeroInitial() {
+ ExponentialBackoffDelayOptions opts = new ExponentialBackoffDelayOptions();
+ opts.setMinimum(Duration.ZERO);
+ ReconnectDelayProvider provider = opts.createProvider();
+
+ assertEquals(Duration.ofSeconds(0), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(1), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(2), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(4), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(8), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(16), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(32), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(64), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(128), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(256), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(300), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(300), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(300), provider.nextDelay());
+ }
+
+ @Test
+ public void testMaximumIncreased() {
+ ExponentialBackoffDelayOptions opts = new ExponentialBackoffDelayOptions();
+ opts.setMinimum(Duration.ZERO);
+ opts.setIncrement(Duration.ofSeconds(4));
+ ReconnectDelayProvider provider = opts.createProvider();
+
+ assertEquals(Duration.ofSeconds(0), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(4), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(8), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(16), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(32), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(64), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(128), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(256), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(300), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(300), provider.nextDelay());
+ assertEquals(Duration.ofSeconds(300), provider.nextDelay());
+ }
+}
diff --git a/src/test/java/io/vertx/mqtt/test/client/MqttClientSessionTest.java b/src/test/java/io/vertx/mqtt/test/client/MqttClientSessionTest.java
new file mode 100644
index 00000000..d1a1442a
--- /dev/null
+++ b/src/test/java/io/vertx/mqtt/test/client/MqttClientSessionTest.java
@@ -0,0 +1,510 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.vertx.mqtt.test.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import io.vertx.mqtt.MqttClientSession;
+import io.vertx.mqtt.MqttClientSessionOptions;
+import io.vertx.mqtt.MqttEndpoint;
+import io.vertx.mqtt.MqttServer;
+import io.vertx.mqtt.MqttTopicSubscription;
+import io.vertx.mqtt.session.RequestedQoS;
+import io.vertx.mqtt.session.SessionState;
+import io.vertx.mqtt.messages.MqttPublishMessage;
+import io.vertx.mqtt.session.SubscriptionEvent;
+import io.vertx.mqtt.session.impl.SubscriptionEventImpl;
+import io.vertx.mqtt.session.SubscriptionState;
+
+@RunWith(VertxUnitRunner.class)
+public class MqttClientSessionTest {
+ private static final Logger log = LoggerFactory.getLogger(MqttClientSessionTest.class);
+ private static final int MQTT_SERVER_TLS_PORT = 1883;
+ private static final String MQTT_SERVER_HOST = "localhost";
+
+ static class SubscribeTestResult {
+ LinkedList events;
+ List payloads;
+ }
+
+ Vertx vertx = Vertx.vertx();
+ MqttServer server;
+ TestContext ctx;
+ List serverMessages = new LinkedList<>();
+
+ @Before
+ public void setup(TestContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @After
+ public void after() {
+ stopServer();
+ }
+
+ void startServer() {
+ Async async = ctx.async();
+ startServerAsync()
+ .onFailure(ctx::fail)
+ .onSuccess(x -> async.complete());
+
+ async.await();
+ }
+
+ Future> startServerAsync() {
+ server = MqttServer.create(vertx);
+ server.exceptionHandler(t -> ctx.fail());
+
+ Promise result = Promise.promise();
+ server
+ .endpointHandler(server -> MqttClientSessionTest.serverLogic(vertx, server, this.serverMessages::add))
+ .listen(result);
+
+ return result.future();
+ }
+
+ void stopServer() {
+ if (this.server != null) {
+ this.server.close(ctx.asyncAssertSuccess(v -> {
+ this.vertx.close(ctx.asyncAssertSuccess());
+ }));
+ this.server = null;
+ }
+ }
+
+ private static void serverLogic(Vertx vertx, MqttEndpoint endpoint, Handler fromClient) {
+ log.info("[SERVER] Client connected");
+
+ Map publishTimers = new HashMap<>();
+
+ endpoint.subscribeHandler(subscribe -> {
+ String names = subscribe.topicSubscriptions().stream().map(MqttTopicSubscription::topicName).collect(Collectors.joining(", "));
+ log.info("[SERVER] Received SUBSCRIBE with message id = " + subscribe.messageId() + " topics = " + names);
+
+ List grantedQosLevels = new ArrayList<>();
+ for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
+ final String topicName = s.topicName();
+ final MqttQoS qos;
+ switch (topicName) {
+ case "foo":
+ case "bar":
+ case "baz/#":
+ case "qos0":
+ qos = MqttQoS.valueOf(Math.min(s.qualityOfService().value(), 0));
+ break;
+ case "qos1":
+ qos = MqttQoS.valueOf(Math.min(s.qualityOfService().value(), 1));
+ break;
+ default:
+ qos = MqttQoS.FAILURE;
+ }
+
+ grantedQosLevels.add(qos);
+
+ if (qos != MqttQoS.FAILURE) {
+ AtomicLong l = new AtomicLong();
+ long t = vertx.setPeriodic(1_000, x -> {
+ long value = l.getAndIncrement();
+ endpoint.publish(topicName, Buffer.buffer("payload" + value), qos, false, false);
+ });
+ Long old = publishTimers.put(topicName, t);
+ if (old != null) {
+ vertx.cancelTimer(old);
+ }
+ }
+
+ }
+
+ // ack the subscriptions request
+ endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
+
+ });
+
+ endpoint.unsubscribeHandler(u -> {
+ log.info("[SERVER] Received UNSUBSCRIBE with message id = " + u.messageId());
+ for (String topic : u.topics()) {
+ long t = publishTimers.remove(topic);
+ vertx.cancelTimer(t);
+ }
+ endpoint.unsubscribeAcknowledge(u.messageId());
+ });
+
+ endpoint.publishHandler(p -> {
+ log.info("[SERVER] Received PUBLISH with message id = " + p.messageId());
+ fromClient.handle(p);
+ switch (p.qosLevel()) {
+ case AT_MOST_ONCE:
+ break;
+ case AT_LEAST_ONCE:
+ endpoint.publishAcknowledge(p.messageId());
+ break;
+ }
+ });
+ endpoint.disconnectHandler(d -> {
+ publishTimers.values().forEach(vertx::cancelTimer);
+ log.info("[SERVER] Client disconnected");
+ });
+
+ // accept session
+
+ endpoint.accept(false);
+ }
+
+ /**
+ * Test starting a server first, and then connecting.
+ */
+ @Test
+ public void testConnect() {
+
+ startServer();
+
+ MqttClientSessionOptions options = new MqttClientSessionOptions()
+ .setPort(MQTT_SERVER_TLS_PORT)
+ .setHostname(MQTT_SERVER_HOST);
+ MqttClientSession client = MqttClientSession
+ .create(vertx, options);
+
+ LinkedList sessionStates = new LinkedList<>();
+
+ Async async = ctx.async();
+
+ client.sessionStateHandler(event -> {
+ sessionStates.add(event.getSessionState());
+
+ switch (event.getSessionState()) {
+ case CONNECTED:
+ client.stop();
+ break;
+ case DISCONNECTED:
+ async.complete();
+ break;
+ }
+ });
+
+ client.start();
+
+ async.await();
+
+ assertArrayEquals(new Object[]{
+ SessionState.CONNECTING,
+ SessionState.CONNECTED,
+ SessionState.DISCONNECTING,
+ SessionState.DISCONNECTED
+ }, sessionStates.toArray());
+ }
+
+ /**
+ * Test starting a connection before the server is running. The session must connect late.
+ */
+ @Test
+ public void testConnectLate() {
+
+ MqttClientSessionOptions options = new MqttClientSessionOptions()
+ .setPort(MQTT_SERVER_TLS_PORT)
+ .setHostname(MQTT_SERVER_HOST);
+ MqttClientSession client = MqttClientSession
+ .create(vertx, options);
+
+ LinkedList sessionStates = new LinkedList<>();
+
+ Async async = ctx.async(2);
+
+ client.sessionStateHandler(event -> {
+ sessionStates.add(event.getSessionState());
+
+ switch (event.getSessionState()) {
+ case CONNECTED:
+ client.stop();
+ break;
+ case DISCONNECTED:
+ async.countDown();
+ break;
+ }
+ });
+
+ client.start();
+
+ vertx.setTimer(2_000, x -> {
+ // start server after 2 seconds
+ startServerAsync();
+ });
+
+ async.await(15_000);
+
+ assertArrayEquals(new Object[]{
+ SessionState.CONNECTING,
+ SessionState.DISCONNECTED,
+
+ SessionState.CONNECTING,
+ SessionState.CONNECTED,
+ SessionState.DISCONNECTING,
+ SessionState.DISCONNECTED
+ }, sessionStates.toArray());
+ }
+
+ private SubscribeTestResult testSubscribe(Duration timeout, Consumer customizer, BiFunction, Boolean> completion) {
+
+ MqttClientSessionOptions options = new MqttClientSessionOptions()
+ .setPort(MQTT_SERVER_TLS_PORT)
+ .setHostname(MQTT_SERVER_HOST);
+ MqttClientSession client = MqttClientSession
+ .create(vertx, options);
+
+ customizer.accept(client);
+
+ Async async = ctx.async();
+
+ LinkedList events = new LinkedList<>();
+ client.subscriptionStateHandler(events::add);
+
+ List payloads = new LinkedList<>();
+ Async msgAsync = ctx.async();
+ client.messageHandler(msg -> {
+ payloads.add(new String[]{msg.topicName(), msg.payload().toString(StandardCharsets.UTF_8)});
+ if (completion.apply(client, payloads)) {
+ msgAsync.complete();
+ }
+ });
+
+ msgAsync.handler(x -> {
+
+ client.sessionStateHandler(event -> {
+ if (event.getSessionState() == SessionState.DISCONNECTED) {
+ async.complete();
+ }
+ });
+
+ client.stop();
+ });
+
+ client.start();
+
+ async.await(timeout.toMillis());
+
+ SubscribeTestResult result = new SubscribeTestResult();
+ result.events = events;
+ result.payloads = payloads;
+ return result;
+ }
+
+ /**
+ * Test subscribing.
+ */
+ @Test
+ public void testSubscribeDefault() {
+
+ startServer();
+
+ SubscribeTestResult result = testSubscribe(Duration.ofSeconds(5), session -> {
+ session.subscribe("qos0", RequestedQoS.QOS_1);
+ },
+ (session, payloads) -> payloads.size() == 2
+ );
+
+ // assert
+
+ assertArrayEquals(new Object[]{
+ new SubscriptionEventImpl("qos0", SubscriptionState.SUBSCRIBING, null),
+ new SubscriptionEventImpl("qos0", SubscriptionState.SUBSCRIBED, 0),
+ new SubscriptionEventImpl("qos0", SubscriptionState.UNSUBSCRIBED, null),
+ }, result.events.toArray());
+
+ assertArrayEquals(new Object[]{
+ new String[]{"qos0", "payload0"},
+ new String[]{"qos0", "payload1"}
+ }, result.payloads.toArray());
+
+ }
+
+ /**
+ * Test subscribing.
+ */
+ @Test
+ public void testSubscribeLate() {
+
+ vertx.setTimer(2_000, x -> startServerAsync());
+
+ SubscribeTestResult result = testSubscribe(Duration.ofSeconds(15), session -> {
+ session.subscribe("qos0", RequestedQoS.QOS_1);
+ },
+ (session, payloads) -> payloads.size() == 2
+ );
+
+ // assert
+
+ assertArrayEquals(new Object[]{
+ new SubscriptionEventImpl("qos0", SubscriptionState.UNSUBSCRIBED, null),
+ new SubscriptionEventImpl("qos0", SubscriptionState.SUBSCRIBING, null),
+ new SubscriptionEventImpl("qos0", SubscriptionState.SUBSCRIBED, 0),
+ new SubscriptionEventImpl("qos0", SubscriptionState.UNSUBSCRIBED, null),
+ }, result.events.toArray());
+
+ assertArrayEquals(new Object[]{
+ new String[]{"qos0", "payload0"},
+ new String[]{"qos0", "payload1"}
+ }, result.payloads.toArray());
+
+ }
+
+ /**
+ * Test subscribing and unsubscribing.
+ */
+ @Test
+ public void testUnsubscribeDefault() {
+
+ startServer();
+
+ SubscribeTestResult result = testSubscribe(Duration.ofSeconds(25),
+ session -> {
+ session.subscribe("qos0", RequestedQoS.QOS_1);
+ },
+ (session, payloads) -> {
+ if (payloads.size() == 2) {
+ // stop
+ session.unsubscribe("qos0");
+ vertx.setTimer(5_000, x -> {
+ session.subscribe("qos1", RequestedQoS.QOS_1);
+ });
+ }
+ return payloads.size() == 4;
+ }
+ );
+
+ // assert
+
+ assertArrayEquals(new Object[]{
+ new SubscriptionEventImpl("qos0", SubscriptionState.SUBSCRIBING, null),
+ new SubscriptionEventImpl("qos0", SubscriptionState.SUBSCRIBED, 0),
+ new SubscriptionEventImpl("qos0", SubscriptionState.UNSUBSCRIBED, null),
+ new SubscriptionEventImpl("qos1", SubscriptionState.SUBSCRIBING, null),
+ new SubscriptionEventImpl("qos1", SubscriptionState.SUBSCRIBED, 1),
+ new SubscriptionEventImpl("qos1", SubscriptionState.UNSUBSCRIBED, null),
+ }, result.events.toArray());
+
+ assertArrayEquals(new Object[]{
+ new String[]{"qos0", "payload0"},
+ new String[]{"qos0", "payload1"},
+ new String[]{"qos1", "payload0"},
+ new String[]{"qos1", "payload1"}
+ }, result.payloads.toArray());
+
+ }
+
+ /**
+ * Test publishing, when it should fail
+ */
+ @Test
+ public void testPublishWhenDisconnected() {
+
+ MqttClientSessionOptions options = new MqttClientSessionOptions()
+ .setPort(MQTT_SERVER_TLS_PORT)
+ .setHostname(MQTT_SERVER_HOST);
+ MqttClientSession client = MqttClientSession
+ .create(vertx, options);
+
+ client.publish("foo", Buffer.buffer(), MqttQoS.AT_MOST_ONCE)
+ // this must fail as we are not connected
+ .onComplete(ctx.asyncAssertFailure());
+
+ }
+
+ /**
+ * Test publishing, when it should fail
+ */
+ @Test
+ public void testPublish() {
+
+ startServer();
+
+ MqttClientSessionOptions options = new MqttClientSessionOptions()
+ .setPort(MQTT_SERVER_TLS_PORT)
+ .setHostname(MQTT_SERVER_HOST);
+ MqttClientSession client = MqttClientSession
+ .create(vertx, options);
+
+ Async async = ctx.async();
+
+ client.sessionStateHandler(event -> {
+
+ switch (event.getSessionState()) {
+ case CONNECTED:
+ client.publishCompletionHandler(x -> client.stop());
+ client.publish("foo", Buffer.buffer("bar"), MqttQoS.AT_LEAST_ONCE);
+ break;
+ case DISCONNECTED:
+ async.complete();
+ break;
+ }
+ });
+
+ client.start();
+
+ async.await(5_000);
+
+ List expectedMessages = new LinkedList<>();
+ expectedMessages.add(MqttPublishMessage.create(
+ 0,
+ MqttQoS.AT_LEAST_ONCE,
+ false,
+ false,
+ "foo",
+ Buffer.buffer("bar").getByteBuf()
+ ));
+
+ assertEquals(expectedMessages.size(), serverMessages.size());
+ for (int i = 0; i < expectedMessages.size(); i++) {
+ MqttPublishMessage expected = expectedMessages.get(i);
+ MqttPublishMessage actual = serverMessages.get(i);
+
+ assertEquals(expected.topicName(), actual.topicName());
+ assertEquals(expected.payload(), actual.payload());
+ assertEquals(expected.qosLevel(), actual.qosLevel());
+ }
+
+ }
+
+}