diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/Clients.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/Clients.java index 695722b794..ae79a8ce81 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/Clients.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/Clients.java @@ -4,12 +4,13 @@ import java.util.concurrent.ConcurrentHashMap; import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor; -import io.vertx.mqtt.MqttClientOptions; +import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession; +import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions; +import io.smallrye.reactive.messaging.mqtt.session.SessionState; +import io.smallrye.reactive.messaging.mqtt.session.SubscriptionEvent; +import io.smallrye.reactive.messaging.mqtt.session.SubscriptionState; import io.vertx.mutiny.core.Vertx; -import io.vertx.mutiny.mqtt.MqttClient; -import io.vertx.mutiny.mqtt.messages.MqttConnAckMessage; import io.vertx.mutiny.mqtt.messages.MqttPublishMessage; public class Clients { @@ -20,24 +21,13 @@ private Clients() { // avoid direct instantiation. } - static Uni getConnectedClient(Vertx vertx, String host, int port, String server, - MqttClientOptions options) { - String id = host + port + "<" + (server == null ? "" : server) - + ">-[" + (options.getClientId() != null ? options.getClientId() : "") + "]"; - ClientHolder holder = clients.computeIfAbsent(id, key -> { - MqttClient client = MqttClient.create(vertx, options); - return new ClientHolder(client, host, port, server); - }); - return holder.connect(); - } - static ClientHolder getHolder(Vertx vertx, String host, int port, String server, - MqttClientOptions options) { + MqttClientSessionOptions options) { String id = host + port + "<" + (server == null ? "" : server) + ">-[" + (options.getClientId() != null ? options.getClientId() : "") + "]"; return clients.computeIfAbsent(id, key -> { - MqttClient client = MqttClient.create(vertx, options); + MqttClientSession client = MqttClientSession.create(vertx.getDelegate(), options); return new ClientHolder(client, host, port, server); }); } @@ -52,33 +42,53 @@ public static void clear() { public static class ClientHolder { - private final MqttClient client; - private final Uni connection; + private final MqttClientSession client; private final BroadcastProcessor messages; + private final BroadcastProcessor sessionState; + private final BroadcastProcessor subscriptionState; - public ClientHolder(MqttClient client, String host, int port, String server) { + public ClientHolder(MqttClientSession client, String host, int port, String server) { this.client = client; - this.connection = client.connect(port, host, server).memoize().indefinitely(); messages = BroadcastProcessor.create(); - client.publishHandler(messages::onNext); - client.closeHandler(messages::onComplete); + sessionState = BroadcastProcessor.create(); + subscriptionState = BroadcastProcessor.create(); + client.messageHandler(m -> messages.onNext(MqttPublishMessage.newInstance(m))); client.exceptionHandler(messages::onError); + client.sessionStateHandler(evt -> sessionState.onNext(evt.getSessionState())); + client.subscriptionStateHandler(subscriptionState::onNext); } - public Uni connect() { - return connection - .map(ignored -> client); + public void start() { + client.start(); } public void close() { - if (client.isConnected()) { - client.disconnectAndAwait(); - } + client.stop(); + } + + public Multi sessionState() { + return Multi.createFrom() + .item(this.client.getState()) + .onCompletion() + .switchTo(this.sessionState); + } + + public Multi subscriptionState(String topic) { + return Multi.createFrom() + .item(this.client.getSubscriptionState(topic)) + .onCompletion() + .switchTo(this.subscriptionState + .filter(evt -> evt.getTopic().equals(topic)) + .map(SubscriptionEvent::getSubscriptionState)); } public Multi stream() { return messages; } + + public MqttClientSession getClient() { + return client; + } } } diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java index 73cd1fb6df..af5f146dd9 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java @@ -42,7 +42,6 @@ @ConnectorAttribute(name = "will-retain", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Set if the will message must be retained", defaultValue = "false") @ConnectorAttribute(name = "will-qos", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set the QoS level for the will message", defaultValue = "0") @ConnectorAttribute(name = "max-message-size", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set max MQTT message size in bytes", defaultValue = "8092") -@ConnectorAttribute(name = "reconnect-attempts", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set the max reconnect attempts", defaultValue = "5") @ConnectorAttribute(name = "reconnect-interval-seconds", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set the reconnect interval in seconds", defaultValue = "1") @ConnectorAttribute(name = "username", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the username to connect to the server") @ConnectorAttribute(name = "password", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the password to connect to the server") @@ -99,7 +98,7 @@ public boolean isReady() { public boolean isSourceReady() { boolean ready = true; for (MqttSource source : sources) { - ready = ready && source.isSubscribed(); + ready = ready && source.isReady(); } return ready; } diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttHelpers.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttHelpers.java index 94372af6f5..d84ea1f2b6 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttHelpers.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttHelpers.java @@ -3,13 +3,13 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; +import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions; import io.vertx.core.net.JksOptions; import io.vertx.core.net.KeyCertOptions; import io.vertx.core.net.PemKeyCertOptions; import io.vertx.core.net.PemTrustOptions; import io.vertx.core.net.PfxOptions; import io.vertx.core.net.TrustOptions; -import io.vertx.mqtt.MqttClientOptions; public class MqttHelpers { @@ -17,8 +17,8 @@ private MqttHelpers() { // avoid direct instantiation. } - static MqttClientOptions createMqttClientOptions(MqttConnectorCommonConfiguration config) { - MqttClientOptions options = new MqttClientOptions(); + static MqttClientSessionOptions createMqttClientOptions(MqttConnectorCommonConfiguration config) { + MqttClientSessionOptions options = new MqttClientSessionOptions(); options.setCleanSession(config.getAutoCleanSession()); options.setAutoGeneratedClientId(config.getAutoGeneratedClientId()); options.setAutoKeepAlive(config.getAutoKeepAlive()); @@ -28,7 +28,6 @@ static MqttClientOptions createMqttClientOptions(MqttConnectorCommonConfiguratio options.setMaxInflightQueue(config.getMaxInflightQueue()); options.setMaxMessageSize(config.getMaxMessageSize()); options.setPassword(config.getPassword().orElse(null)); - options.setReconnectAttempts(config.getReconnectAttempts()); options.setReconnectInterval(TimeUnit.SECONDS.toMillis(config.getReconnectIntervalSeconds())); options.setSsl(config.getSsl()); options.setKeyCertOptions(getKeyCertOptions(config)); diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java index a16ca7a092..3f40704aca 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java @@ -13,13 +13,15 @@ import io.netty.handler.codec.mqtt.MqttQoS; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.vertx.AsyncResultUni; +import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession; +import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions; +import io.smallrye.reactive.messaging.mqtt.session.SessionState; import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; -import io.vertx.mqtt.MqttClientOptions; import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.core.buffer.Buffer; -import io.vertx.mutiny.mqtt.MqttClient; public class MqttSink { @@ -30,10 +32,10 @@ public class MqttSink { private final int qos; private final SubscriberBuilder, Void> sink; - private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean ready = new AtomicBoolean(); public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config) { - MqttClientOptions options = MqttHelpers.createMqttClientOptions(config); + MqttClientSessionOptions options = MqttHelpers.createMqttClientOptions(config); host = config.getHost(); int def = options.isSsl() ? 8883 : 1883; port = config.getPort().orElse(def); @@ -41,58 +43,49 @@ public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config) { topic = config.getTopic().orElseGet(config::getChannel); qos = config.getQos(); - AtomicReference reference = new AtomicReference<>(); + AtomicReference reference = new AtomicReference<>(); sink = ReactiveStreams.> builder() .flatMapCompletionStage(msg -> { - MqttClient client = reference.get(); - if (client != null) { - if (client.isConnected()) { - connected.set(true); - return CompletableFuture.completedFuture(msg); - } else { - CompletableFuture> future = new CompletableFuture<>(); - vertx.setPeriodic(100, id -> { - if (client.isConnected()) { - vertx.cancelTimer(id); - connected.set(true); - future.complete(msg); - } - }); - return future; - } - } else { - return Clients.getConnectedClient(vertx, host, port, server, options) - .map(c -> { - reference.set(c); - connected.set(true); - return msg; - }) - .subscribeAsCompletionStage(); + Clients.ClientHolder client = reference.get(); + if (client == null) { + client = Clients.getHolder(vertx, host, port, server, options); + // FIXME: add session state listener + client.start(); + reference.set(client); } + + return client.sessionState() + .filter(state -> state != SessionState.CONNECTED) + .map(ignore -> msg).toUni().subscribeAsCompletionStage(); + }) .flatMapCompletionStage(msg -> send(reference, msg)) .onComplete(() -> { - MqttClient c = reference.getAndSet(null); + Clients.ClientHolder c = reference.getAndSet(null); if (c != null) { - connected.set(false); - c.disconnectAndForget(); + ready.set(false); + c.close(); } }) .onError(log::errorWhileSendingMessageToBroker) .ignore(); } - private CompletionStage send(AtomicReference reference, Message msg) { - MqttClient client = reference.get(); - String actualTopicToBeUsed = this.topic; - MqttQoS actualQoS = MqttQoS.valueOf(this.qos); - boolean isRetain = false; + private CompletionStage send(AtomicReference reference, Message msg) { + MqttClientSession client = reference.get().getClient(); + final String actualTopicToBeUsed; + final MqttQoS actualQoS; + final boolean isRetain; if (msg instanceof SendingMqttMessage) { MqttMessage mm = ((SendingMqttMessage) msg); - actualTopicToBeUsed = mm.getTopic() == null ? topic : mm.getTopic(); - actualQoS = mm.getQosLevel() == null ? actualQoS : mm.getQosLevel(); + actualTopicToBeUsed = mm.getTopic() == null ? this.topic : mm.getTopic(); + actualQoS = mm.getQosLevel() == null ? MqttQoS.valueOf(this.qos) : mm.getQosLevel(); isRetain = mm.isRetain(); + } else { + actualTopicToBeUsed = this.topic; + isRetain = false; + actualQoS = MqttQoS.valueOf(this.qos); } if (actualTopicToBeUsed == null) { @@ -100,7 +93,10 @@ private CompletionStage send(AtomicReference reference, Message toUni(h -> client + .publish(actualTopicToBeUsed, convert(msg.getPayload()).getDelegate(), actualQoS, false, isRetain) + .onComplete(h)) .onItemOrFailure().transformToUni((s, f) -> { if (f != null) { return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg)); @@ -139,6 +135,6 @@ public SubscriberBuilder, Void> getSink() { } public boolean isReady() { - return connected.get(); + return ready.get(); } } diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java index d9647270db..bfd9053d17 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java @@ -9,18 +9,20 @@ import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; -import io.vertx.mqtt.MqttClientOptions; +import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions; +import io.smallrye.reactive.messaging.mqtt.session.RequestedQoS; +import io.smallrye.reactive.messaging.mqtt.session.SubscriptionState; import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.mqtt.messages.MqttPublishMessage; public class MqttSource { private final PublisherBuilder> source; - private final AtomicBoolean subscribed = new AtomicBoolean(); + private final AtomicBoolean ready = new AtomicBoolean(); private final Pattern pattern; public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config) { - MqttClientOptions options = MqttHelpers.createMqttClientOptions(config); + MqttClientSessionOptions options = MqttHelpers.createMqttClientOptions(config); String host = config.getHost(); int def = options.isSsl() ? 8883 : 1883; @@ -41,22 +43,24 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config) { } Clients.ClientHolder holder = Clients.getHolder(vertx, host, port, server, options); + holder.subscriptionState(topic) + .onItem().invoke(state -> { + ready.set(state == SubscriptionState.SUBSCRIBED); + }); + holder.start(); + holder.getClient() + .subscribe(topic, RequestedQoS.valueOf(qos)); this.source = ReactiveStreams.fromPublisher( - holder.connect() - .onItem().transformToMulti(client -> client.subscribe(topic, qos) - .onItem().transformToMulti(x -> { - subscribed.set(true); - return holder.stream() - .select().where(m -> matches(topic, m)) - .onItem().transform(m -> new ReceivingMqttMessage(m, onNack)); - })) + holder.stream() + .select().where(m -> matches(topic, m)) + .onItem().transform(m -> new ReceivingMqttMessage(m, onNack)) .stage(multi -> { if (broadcast) { return multi.broadcast().toAllSubscribers(); } return multi; }) - .onCancellation().invoke(() -> subscribed.set(false)) + .onCancellation().invoke(() -> holder.getClient().unsubscribe(topic)) .onFailure().invoke(log::unableToConnectToBroker)); } @@ -82,7 +86,8 @@ PublisherBuilder> getSource() { return source; } - boolean isSubscribed() { - return subscribed.get(); + public boolean isReady() { + return ready.get(); } + } diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/i18n/MqttExceptions.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/i18n/MqttExceptions.java index 1fe53d3906..d95f51b278 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/i18n/MqttExceptions.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/i18n/MqttExceptions.java @@ -16,4 +16,7 @@ public interface MqttExceptions { @Message(id = 17000, value = "Unknown failure strategy: %s") IllegalArgumentException illegalArgumentUnknownStrategy(String strategy); + @Message(id = 17001, value = "Invalid QoS value: %s") + IllegalArgumentException illegalArgumentInvalidQoS(int qos); + } diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ConstantReconnectDelayOptions.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ConstantReconnectDelayOptions.java new file mode 100644 index 0000000000..dd9b614308 --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ConstantReconnectDelayOptions.java @@ -0,0 +1,49 @@ +package io.smallrye.reactive.messaging.mqtt.session; + +import java.time.Duration; + +public class ConstantReconnectDelayOptions implements ReconnectDelayOptions { + + private static final Duration DEFAULT_DELAY = Duration.ofSeconds(10); + + private Duration delay = DEFAULT_DELAY; + + public ConstantReconnectDelayOptions() { + } + + public ConstantReconnectDelayOptions setDelay(Duration delay) { + this.delay = delay; + return this; + } + + public Duration getDelay() { + return this.delay; + } + + @Override + public ReconnectDelayProvider createProvider() { + + final Duration delay = this.delay; + + return new ReconnectDelayProvider() { + + @Override + public Duration nextDelay() { + return delay; + } + + @Override + public void reset() { + // no-op + } + }; + + } + + @Override + public ReconnectDelayOptions copy() { + ConstantReconnectDelayOptions result = new ConstantReconnectDelayOptions(); + result.delay = this.delay; + return result; + } +} diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ExponentialBackoffDelayOptions.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ExponentialBackoffDelayOptions.java new file mode 100644 index 0000000000..dc340c3520 --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ExponentialBackoffDelayOptions.java @@ -0,0 +1,119 @@ +package io.smallrye.reactive.messaging.mqtt.session; + +import java.time.Duration; + +public class ExponentialBackoffDelayOptions implements ReconnectDelayOptions { + + private static final Duration DEFAULT_MINIMUM = Duration.ofSeconds(1); + private static final Duration DEFAULT_INCREMENT = Duration.ofSeconds(1); + private static final Duration DEFAULT_MAXIMUM = Duration.ofMinutes(5); + + private Duration minimum = DEFAULT_MINIMUM; + private Duration increment = DEFAULT_INCREMENT; + private Duration maximum = DEFAULT_MAXIMUM; + + public ExponentialBackoffDelayOptions() { + } + + public ExponentialBackoffDelayOptions setIncrement(Duration increment) { + this.increment = increment; + return this; + } + + public Duration getIncrement() { + return this.increment; + } + + public ExponentialBackoffDelayOptions setMaximum(Duration maximum) { + this.maximum = maximum; + return this; + } + + public Duration getMaximum() { + return this.maximum; + } + + public ExponentialBackoffDelayOptions setMinimum(Duration minimum) { + this.minimum = minimum; + return this; + } + + public Duration getMinimum() { + return this.minimum; + } + + private void validate() { + if (this.minimum.isNegative()) { + throw new IllegalArgumentException("'minimum' must be a positive or zero duration"); + } + if (this.increment.isNegative() || this.increment.isZero()) { + throw new IllegalArgumentException("'increment' must be a positive duration"); + } + if (this.maximum.isNegative() || this.maximum.isZero()) { + throw new IllegalArgumentException("'maximum' must be a positive duration"); + } + if (this.maximum.compareTo(this.minimum) < 0) { + throw new IllegalArgumentException("'minimum' must be less than (or equal) to the maximum"); + } + } + + @Override + public ReconnectDelayProvider createProvider() { + validate(); + + long num = this.maximum.minus(this.minimum).toMillis() / this.increment.toMillis(); + long max = (long) (Math.log(num) / Math.log(2)) + 1; + + return new Provider(this.minimum, this.increment, this.maximum, max); + } + + @Override + public ReconnectDelayOptions copy() { + ExponentialBackoffDelayOptions result = new ExponentialBackoffDelayOptions(); + result.minimum = this.minimum; + result.increment = this.increment; + result.maximum = this.maximum; + return result; + } + + private static class Provider implements ReconnectDelayProvider { + + private final Duration minimum; + private final Duration increment; + private final Duration maximum; + private final long max; + + private long count; + + Provider(Duration minimum, Duration increment, Duration maximum, long max) { + this.minimum = minimum; + this.increment = increment; + this.maximum = maximum; + this.max = max; + } + + @Override + public Duration nextDelay() { + + if (this.count <= this.max) { + + Duration delay = this.minimum; + if (this.count > 0) { + delay = delay.plus(this.increment.multipliedBy((long) Math.pow(2, this.count - 1))); + } + + this.count += 1; + + return delay; + } else { + return this.maximum; + } + + } + + @Override + public void reset() { + this.count = 0; + } + } +} diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java new file mode 100644 index 0000000000..379d0636fe --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java @@ -0,0 +1,236 @@ +/* + * Copyright 2021 Red Hat Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.smallrye.reactive.messaging.mqtt.session; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import io.netty.handler.codec.mqtt.MqttQoS; +import io.smallrye.reactive.messaging.mqtt.session.impl.MqttClientSessionImpl; +import io.vertx.codegen.annotations.Fluent; +import io.vertx.codegen.annotations.GenIgnore; +import io.vertx.codegen.annotations.VertxGen; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.messages.MqttPublishMessage; + +/** + * An MQTT client session. + */ +@VertxGen +public interface MqttClientSession { + + /** + * Create a new MQTT client session. + *

+ * The session will initially be disconnected, and must be started using {@link #start()}. + * + * @param vertx Vert.x instance + * @param options MQTT client session options + * @return MQTT client session instance + */ + static MqttClientSession create(Vertx vertx, MqttClientSessionOptions options) { + return new MqttClientSessionImpl(vertx, options); + } + + /** + * Set the session state handler. + * + * @param sessionStateHandler The new handler, will overwrite the old one. + * @return current MQTT client session instance + */ + @Fluent + MqttClientSession sessionStateHandler(Handler sessionStateHandler); + + /** + * Set the subscription state handler. + * + * @param subscriptionStateHandler The new handler, will overwrite the old one. + * @return current MQTT client session instance + */ + @Fluent + MqttClientSession subscriptionStateHandler(Handler subscriptionStateHandler); + + /** + * Set the publish complete handler. + * + * @param publishCompleteHandler The new handler, will overwrite the old one. + * @return current MQTT client session instance + * @see MqttClient#publishCompletionHandler(Handler) + */ + @Fluent + MqttClientSession publishCompletionHandler(Handler publishCompleteHandler); + + /** + * Set the publish completion expiration handler. + * + * @param publishCompletionExpirationHandler The new handler, will overwrite the old one. + * @return current MQTT client session instance + * @see MqttClient#publishCompletionExpirationHandler(Handler) + */ + @Fluent + MqttClientSession publishCompletionExpirationHandler(Handler publishCompletionExpirationHandler); + + /** + * Set the publish completion unknown packet id handler. + * + * @param publishCompletionUnknownPacketIdHandler The new handler, will overwrite the old one. + * @return current MQTT client session instance + * @see MqttClient#publishCompletionUnknownPacketIdHandler(Handler) + */ + @Fluent + MqttClientSession publishCompletionUnknownPacketIdHandler(Handler publishCompletionUnknownPacketIdHandler); + + /** + * Start the session. This will try to drive the connection to {@link SessionState#CONNECTED}. + */ + void start(); + + /** + * Stop the session. This will try to drive the connection to {@link SessionState#DISCONNECTED}. + */ + void stop(); + + /** + * Get the current session state. + * + * @return The current state. + */ + SessionState getState(); + + /** + * Get a current subscription state. + * + * @param topicFilter The topic filter to get the state for. + * @return The current state of the requested subscription. + */ + SubscriptionState getSubscriptionState(String topicFilter); + + /** + * Check if the session is currently connected. + * + * @return {@code true} if the session is currently connected, {@code false} otherwise. + */ + default boolean isConnected() { + return getState() == SessionState.CONNECTED; + } + + /** + * Subscribes to the topics with related QoS levels + * + * @param topics topics and related QoS levels to subscribe to + * @return current MQTT client session instance + */ + @Fluent + MqttClientSession subscribe(Map topics); + + /** + * Subscribes to a single topic with related QoS level. + * + * @param topic The topic to subscribe to. + * @param qos The QoS to request from the server. + * @return current MQTT client session instance + */ + @Fluent + default MqttClientSession subscribe(String topic, RequestedQoS qos) { + return subscribe(Collections.singletonMap(topic, qos)); + } + + /** + * Subscribes to a list of topics, with the same QoS. + * + * @param qos The QoS to use. + * @param topics The topics to subscribe to. + * @return current MQTT client session instance + */ + @Fluent + @GenIgnore + default MqttClientSession subscribe(RequestedQoS qos, String... topics) { + final Map topicMap = new LinkedHashMap<>(topics.length); + for (String topic : topics) { + topicMap.put(topic, qos); + } + return subscribe(topicMap); + } + + /** + * Unsubscribe from receiving messages on given topics + * + * @param topics Topics you want to unsubscribe from + * @return current MQTT client session instance + */ + MqttClientSession unsubscribe(List topics); + + /** + * Unsubscribe from receiving messages on given topics + * + * @param topics Topics you want to unsubscribe from + * @return current MQTT client session instance + */ + @GenIgnore + default MqttClientSession unsubscribe(String... topics) { + return unsubscribe(Arrays.asList(topics)); + } + + /** + * Sets handler which will be called each time server publish something to client + * + * @param messageHandler handler to call + * @return current MQTT client session instance + */ + @Fluent + MqttClientSession messageHandler(Handler messageHandler); + + /** + * Sets handler which will be called in case of an exception + * + * @param exceptionHandler handler to call + * @return current MQTT client session instance + */ + @Fluent + MqttClientSession exceptionHandler(Handler exceptionHandler); + + /** + * Sends the PUBLISH message to the remote MQTT server + * + * @param topic topic on which the message is published + * @param payload message payload + * @param qosLevel QoS level + * @param isDup if the message is a duplicate + * @param isRetain if the message needs to be retained + * @return a {@code Future} completed after PUBLISH packet sent with packetid (not when QoS 0) + */ + Future publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain); + + /** + * Sends the PUBLISH message to the remote MQTT server + * + * @param topic topic on which the message is published + * @param payload message payload + * @param qosLevel QoS level + * @return a {@code Future} completed after PUBLISH packet sent with packetid (not when QoS 0) + */ + default Future publish(String topic, Buffer payload, MqttQoS qosLevel) { + return publish(topic, payload, qosLevel, false, false); + } +} diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSessionOptions.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSessionOptions.java new file mode 100644 index 0000000000..997ff08ec5 --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSessionOptions.java @@ -0,0 +1,58 @@ +package io.smallrye.reactive.messaging.mqtt.session; + +import io.vertx.mqtt.MqttClientOptions; + +public class MqttClientSessionOptions extends MqttClientOptions { + + private static final ReconnectDelayOptions DEFAULT_RECONNECT_DELAY = new ConstantReconnectDelayOptions(); + + private String hostname = MqttClientOptions.DEFAULT_HOST; + private int port = MqttClientOptions.DEFAULT_PORT; + private ReconnectDelayOptions reconnectDelay = DEFAULT_RECONNECT_DELAY; + + /** + * Default constructor + */ + public MqttClientSessionOptions() { + super(); + } + + /** + * Copy constructor + * + * @param other the options to copy + */ + public MqttClientSessionOptions(MqttClientSessionOptions other) { + super(other); + this.hostname = other.hostname; + this.port = other.port; + this.reconnectDelay = other.reconnectDelay.copy(); + } + + public int getPort() { + return this.port; + } + + public MqttClientSessionOptions setPort(int port) { + this.port = port; + return this; + } + + public String getHostname() { + return this.hostname; + } + + public MqttClientSessionOptions setHostname(String hostname) { + this.hostname = hostname; + return this; + } + + public MqttClientSessionOptions setReconnectDelay(ReconnectDelayOptions reconnectDelay) { + this.reconnectDelay = reconnectDelay; + return this; + } + + public ReconnectDelayOptions getReconnectDelay() { + return this.reconnectDelay; + } +} diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayOptions.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayOptions.java new file mode 100644 index 0000000000..40c4aacdf5 --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayOptions.java @@ -0,0 +1,25 @@ +/* + * Copyright 2021 Red Hat Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.smallrye.reactive.messaging.mqtt.session; + +public interface ReconnectDelayOptions { + + ReconnectDelayProvider createProvider(); + + ReconnectDelayOptions copy(); + +} diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayProvider.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayProvider.java new file mode 100644 index 0000000000..a4af273276 --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayProvider.java @@ -0,0 +1,11 @@ +package io.smallrye.reactive.messaging.mqtt.session; + +import java.time.Duration; + +public interface ReconnectDelayProvider { + + Duration nextDelay(); + + void reset(); + +} diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/RequestedQoS.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/RequestedQoS.java new file mode 100644 index 0000000000..6e1c024ef7 --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/RequestedQoS.java @@ -0,0 +1,37 @@ +package io.smallrye.reactive.messaging.mqtt.session; + +import io.smallrye.reactive.messaging.mqtt.i18n.MqttExceptions; + +/** + * The requested QoS level. + *

+ * NOTE: This is missing QoS 2, as this mode is not properly supported by the session. + */ +public enum RequestedQoS { + QOS_0(0), + QOS_1(1); + + private final int value; + + RequestedQoS(int value) { + this.value = value; + } + + public int toInteger() { + return this.value; + } + + public static RequestedQoS valueOf(Integer qos) { + if (qos == null) { + return null; + } + switch (qos) { + case 0: + return RequestedQoS.QOS_0; + case 1: + return RequestedQoS.QOS_1; + default: + throw MqttExceptions.ex.illegalArgumentInvalidQoS(qos); + } + } +} diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionEvent.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionEvent.java new file mode 100644 index 0000000000..36ad1e79ef --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionEvent.java @@ -0,0 +1,10 @@ +package io.smallrye.reactive.messaging.mqtt.session; + +import io.vertx.codegen.annotations.VertxGen; + +@VertxGen +public interface SessionEvent { + SessionState getSessionState(); + + Throwable getCause(); +} diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionState.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionState.java new file mode 100644 index 0000000000..5fe99b9abd --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionState.java @@ -0,0 +1,27 @@ +package io.smallrye.reactive.messaging.mqtt.session; + +/** + * The state of the session. + */ +public enum SessionState { + /** + * The session is disconnected. + *

+ * A re-connect timer may be pending. + */ + DISCONNECTED, + /** + * The session started to connect. + *

+ * This may include re-subscribing to any topics after the connect call was successful. + */ + CONNECTING, + /** + * The session is connected. + */ + CONNECTED, + /** + * The session is in the process of an orderly disconnect. + */ + DISCONNECTING, +} diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionEvent.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionEvent.java new file mode 100644 index 0000000000..76e4bb3ee3 --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionEvent.java @@ -0,0 +1,12 @@ +package io.smallrye.reactive.messaging.mqtt.session; + +import io.vertx.codegen.annotations.VertxGen; + +@VertxGen +public interface SubscriptionEvent { + Integer getQos(); + + SubscriptionState getSubscriptionState(); + + String getTopic(); +} diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionState.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionState.java new file mode 100644 index 0000000000..c6eec73ae2 --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionState.java @@ -0,0 +1,29 @@ +package io.smallrye.reactive.messaging.mqtt.session; + +/** + * The state of a subscription. + *

+ * Subscriptions established when a new topic gets added, or the connection was established. If the subscribe call + * returns an error for the subscription, the state will remain {@link #FAILED} and it will not try to re-subscribe + * while the connection is active. + *

+ * When the session (connection) disconnects, all subscriptions will automatically be reset to {@link #UNSUBSCRIBED}. + */ +public enum SubscriptionState { + /** + * The topic is not subscribed. + */ + UNSUBSCRIBED, + /** + * The topic is in the process of subscribing. + */ + SUBSCRIBING, + /** + * The topic is subscribed. + */ + SUBSCRIBED, + /** + * The topic could not be subscribed. + */ + FAILED, +} diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/MqttClientSessionImpl.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/MqttClientSessionImpl.java new file mode 100644 index 0000000000..020bee682e --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/MqttClientSessionImpl.java @@ -0,0 +1,661 @@ +package io.smallrye.reactive.messaging.mqtt.session.impl; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import io.netty.handler.codec.mqtt.MqttQoS; +import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession; +import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions; +import io.smallrye.reactive.messaging.mqtt.session.ReconnectDelayProvider; +import io.smallrye.reactive.messaging.mqtt.session.RequestedQoS; +import io.smallrye.reactive.messaging.mqtt.session.SessionEvent; +import io.smallrye.reactive.messaging.mqtt.session.SessionState; +import io.smallrye.reactive.messaging.mqtt.session.SubscriptionEvent; +import io.smallrye.reactive.messaging.mqtt.session.SubscriptionState; +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.VertxException; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.messages.MqttConnAckMessage; +import io.vertx.mqtt.messages.MqttPublishMessage; +import io.vertx.mqtt.messages.MqttSubAckMessage; + +public class MqttClientSessionImpl implements MqttClientSession { + + private static final Logger log = LoggerFactory.getLogger(MqttClientSessionImpl.class); + + private final VertxInternal vertx; + private final MqttClientSessionOptions options; + + // record the subscriptions + private final Map subscriptions = new HashMap<>(); + // record the pending subscribes + private final Map> pendingSubscribes = new HashMap<>(); + // record the pending unsubscribes + private final Map> pendingUnsubscribes = new HashMap<>(); + // the provider for the reconnect delay + private final ReconnectDelayProvider reconnectDelay; + + // the current state + private volatile SessionState state = SessionState.DISCONNECTED; + // drives to connection either to CONNECTED or DISCONNECTED + private volatile boolean running; + // subscription states + private final Map subscriptionStates = new ConcurrentHashMap<>(); + + // holds the actual MQTT client connection + private MqttClient client; + // an optional reconnect timer + private Long reconnectTimer; + + private volatile Handler messageHandler; + private volatile Handler exceptionHandler; + private volatile Handler sessionStateHandler; + private volatile Handler subscriptionStateHandler; + private volatile Handler publishCompleteHandler; + private volatile Handler publishCompletionExpirationHandler; + private volatile Handler publishCompletionUnknownPacketIdHandler; + + /** + * Create a new instance, which is not started. + * + * @param vertx The vert.x instance to use. + * @param options The client session options. + */ + public MqttClientSessionImpl(final Vertx vertx, final MqttClientSessionOptions options) { + this.vertx = (VertxInternal) vertx; + this.options = options; + this.reconnectDelay = options.getReconnectDelay().createProvider(); + } + + @Override + public void start() { + this.vertx.runOnContext(x -> doStart()); + } + + @Override + public void stop() { + this.vertx.runOnContext(x -> doStop()); + } + + @Override + public SessionState getState() { + return this.state; + } + + @Override + public SubscriptionState getSubscriptionState(String topicFilter) { + return this.subscriptionStates.get(topicFilter); + } + + @Override + public MqttClientSession subscribe(Map topics) { + final Map finalTopics = new LinkedHashMap<>(topics); + this.vertx.runOnContext(x -> doSubscribe(finalTopics)); + return this; + } + + @Override + public MqttClientSession unsubscribe(List topics) { + final Set finalTopics = new HashSet<>(topics); + this.vertx.runOnContext(x -> doUnsubscribe(finalTopics)); + return this; + } + + private void doStart() { + if (this.running) { + // nothing to do + return; + } + + // we connect, not re-connect + this.reconnectDelay.reset(); + + this.running = true; + switch (this.state) { + case DISCONNECTED: + // initiate connection + createConnection(); + break; + case CONNECTING: + // nothing to do + break; + case CONNECTED: + // nothing to do + break; + case DISCONNECTING: + // we do nothing here and wait until the disconnection advanced, which will then trigger a re-connect + break; + } + } + + private void doStop() { + if (!this.running) { + // nothing to do + return; + } + + this.running = false; + + if (this.reconnectTimer != null) { + // we have a re-connect scheduled, but stop right now. + this.vertx.cancelTimer(this.reconnectTimer); + } + + switch (this.state) { + case CONNECTED: + closeConnection((Throwable) null); + break; + case DISCONNECTED: + // nothing to do + break; + case DISCONNECTING: + // nothing do do + break; + case CONNECTING: + // we do nothing here and wait, until the connection advanced, which will then trigger a disconnect + break; + } + } + + @Override + public MqttClientSession exceptionHandler(Handler exceptionHandler) { + this.exceptionHandler = exceptionHandler; + return this; + } + + @Override + public MqttClientSession sessionStateHandler(Handler sessionStateHandler) { + this.sessionStateHandler = sessionStateHandler; + return this; + } + + @Override + public MqttClientSession subscriptionStateHandler(Handler subscriptionStateHandler) { + this.subscriptionStateHandler = subscriptionStateHandler; + return this; + } + + @Override + public MqttClientSession publishCompletionHandler(Handler publishCompleteHandler) { + this.publishCompleteHandler = publishCompleteHandler; + return this; + } + + @Override + public MqttClientSession publishCompletionExpirationHandler(Handler publishCompletionExpirationHandler) { + this.publishCompletionExpirationHandler = publishCompletionExpirationHandler; + return this; + } + + @Override + public MqttClientSession publishCompletionUnknownPacketIdHandler(Handler publishCompletionUnknownPacketIdHandler) { + this.publishCompletionUnknownPacketIdHandler = publishCompletionUnknownPacketIdHandler; + return this; + } + + @Override + public MqttClientSession messageHandler(Handler messageHandler) { + this.messageHandler = messageHandler; + return this; + } + + /** + * Set the state of the session. + * + * @param sessionState The new state. + * @param cause The optional cause, in case of an error. + */ + private void setState(final SessionState sessionState, final Throwable cause) { + + if (log.isDebugEnabled()) { + log.debug(String.format("setState - current: %s, next: %s", this.state, sessionState), cause); + } + + // before announcing our state change + + switch (sessionState) { + case CONNECTING: + break; + case CONNECTED: + // successful connection, reset delay + this.reconnectDelay.reset(); + break; + case DISCONNECTING: + break; + case DISCONNECTED: + this.pendingUnsubscribes.clear(); + this.pendingSubscribes.clear(); + for (String topic : this.subscriptions.keySet()) { + notifySubscriptionState(topic, SubscriptionState.UNSUBSCRIBED, null); + } + break; + } + + // announce state change + + if (this.state != sessionState) { + this.state = sessionState; + Handler handler = this.sessionStateHandler; + if (handler != null) { + handler.handle(new SessionEventImpl(sessionState, cause)); + } + } + + // after announcing out state change + + switch (this.state) { + case CONNECTING: + // we just wait for the outcome + break; + case CONNECTED: + if (!this.running) { + closeConnection((Throwable) null); + } + break; + case DISCONNECTING: + // we just wait for the outcome + break; + case DISCONNECTED: + if (this.running) { + scheduleReconnect(); + } + break; + } + } + + private void notifySubscriptionState(final String topic, final SubscriptionState state, final Integer grantedQoS) { + + if (log.isDebugEnabled()) { + log.debug(String.format("setSubscriptionState - topic: %s, state: %s, grantedQoS: %s", topic, state, grantedQoS)); + } + + this.subscriptionStates.put(topic, state); + + Handler handler = this.subscriptionStateHandler; + if (handler != null) { + handler.handle(new SubscriptionEventImpl(topic, state, grantedQoS)); + } + + } + + private void scheduleReconnect() { + log.debug("Scheduling reconnect"); + + if (this.reconnectTimer == null) { + + final Duration delay = nextDelay(); + if (log.isDebugEnabled()) { + log.debug("Next delay: " + delay); + } + + final long timer = vertx.setTimer(delay.toMillis(), x -> createConnection()); + if (log.isDebugEnabled()) { + log.debug("Timer set: " + timer); + } + + this.reconnectTimer = timer; + } + } + + /** + * Calculate the next delay before trying to re-connect. + * + * @return The duration to wait. + */ + private Duration nextDelay() { + return this.reconnectDelay.nextDelay(); + } + + /** + * Initiates the connection. + */ + private void createConnection() { + log.debug("Creating connection"); + + // clear reconnect timer + this.reconnectTimer = null; + + // create client + this.client = MqttClient.create(this.vertx, this.options); + this.client.exceptionHandler(this::exceptionCaught); + this.client.closeHandler(x -> connectionClosed()); + this.client.publishHandler(this::serverPublished); + this.client.subscribeCompletionHandler(this::subscribeCompleted); + this.client.unsubscribeCompletionHandler(this::unsubscribeCompleted); + this.client.publishCompletionHandler(this::publishComplete); + this.client.publishCompletionExpirationHandler(this::publishExpired); + this.client.publishCompletionUnknownPacketIdHandler(this::publishCompletionUnknown); + + // change state + setState(SessionState.CONNECTING, null); + // start connection + this.client.connect(this.options.getPort(), this.options.getHostname()).onComplete(this::connectCompleted); + } + + /** + * Handle a caught exception. + */ + private void exceptionCaught(Throwable cause) { + log.debug("Caught exception", cause); + closeConnection(cause); + Handler exceptionHandler = this.exceptionHandler; + if (exceptionHandler != null) { + exceptionHandler.handle(cause); + } + } + + /** + * Initiates the connection shutdown. + */ + private void closeConnection(Throwable cause) { + log.debug("Closing connection", cause); + + setState(SessionState.DISCONNECTING, cause); + this.client.disconnect().onComplete(this::disconnectCompleted); + } + + /** + * Gets called when the connect call was processed. + * + * @param result The outcome of the connect call. + */ + private void connectCompleted(AsyncResult result) { + + if (log.isDebugEnabled()) { + log.debug(String.format("Connect completed - result: %s, cause: %s", result.result(), result.cause())); + } + + if (result.failed() || result.result() == null) { + // this will include CONACKs with error codes + setState(SessionState.DISCONNECTED, result.cause()); + return; + } + + MqttConnAckMessage ack = result.result(); + + setState(SessionState.CONNECTED, null); + + if (!this.subscriptions.isEmpty() && (options.isCleanSession() || !ack.isSessionPresent())) { + // re-subscribe if we have requested subscriptions and (either cleanSession=true or no session found on the server) + requestSubscribe(new LinkedHashMap<>(this.subscriptions)); + } + } + + /** + * Gets called when the disconnect call was processed. + * + * @param result The outcome of the disconnect call. + */ + private void disconnectCompleted(AsyncResult result) { + + if (log.isDebugEnabled()) { + log.debug(String.format("Disconnect completed - result: %s, cause: %s", result.result(), result.cause())); + } + + connectionClosed(result.cause()); + } + + /** + * Gets called internally when the only reasonable action is to just disconnect. + *

+ * If the session is still running, then it will trigger a re-connect. + * + * @param reason The reason message. + */ + private void closeConnection(final String reason) { + closeConnection(new VertxException(reason).fillInStackTrace()); + } + + /** + * Gets called when the connection just dropped. + */ + private void connectionClosed() { + if (this.state != SessionState.DISCONNECTING) { + // this came unexpected + connectionClosed(new VertxException("Connection closed")); + } + } + + /** + * Called to clean up the after a connection was closed. + * + * @param cause The cause of the connection closure. + */ + private void connectionClosed(final Throwable cause) { + log.info("Connection closed", cause); + + if (this.client != null) { + this.client.exceptionHandler(null); + this.client.publishHandler(null); + this.client.closeHandler(null); + this.client.subscribeCompletionHandler(null); + this.client.publishCompletionHandler(null); + this.client.publishCompletionExpirationHandler(null); + this.client.publishCompletionUnknownPacketIdHandler(null); + this.client = null; + } + setState(SessionState.DISCONNECTED, cause); + } + + /** + * Gets called when the server published a message for us. + * + * @param message The published message. + */ + private void serverPublished(MqttPublishMessage message) { + if (log.isDebugEnabled()) { + log.debug("Server published: " + message); + } + + Handler publishHandler = this.messageHandler; + if (publishHandler != null) { + publishHandler.handle(message); + } + } + + /** + * Perform subscribing. + * + * @param topics The topics to subscribe to. + */ + private void doSubscribe(Map topics) { + final LinkedHashMap subscriptions = new LinkedHashMap<>(topics.size()); + + for (Map.Entry entry : topics.entrySet()) { + this.subscriptions.compute(entry.getKey(), (key, current) -> { + if (current != entry.getValue()) { + subscriptions.put(entry.getKey(), entry.getValue()); + } + return entry.getValue(); + }); + } + + if (log.isDebugEnabled()) { + log.debug("Requesting subscribe: " + subscriptions); + } + requestSubscribe(subscriptions); + } + + /** + * Perform unsubscribing. + * + * @param topics The topics to unsubscribe from. + */ + private void doUnsubscribe(Set topics) { + final List topicsToSend = new ArrayList<>(topics.size()); + for (String topic : topics) { + if (this.subscriptions.remove(topic) != null) { + topicsToSend.add(topic); + } + } + + if (log.isDebugEnabled()) { + log.debug("Requesting unsubscribe: " + topicsToSend); + } + requestUnsubscribe(topicsToSend); + } + + /** + * Request to subscribe from the server. + * + * @param topics The topics to subscribe to, including the requested QoS. + */ + private void requestSubscribe(LinkedHashMap topics) { + if (topics.isEmpty() || this.client == null) { + // nothing to do + return; + } + + this.client + .subscribe(topics.entrySet() + .stream().collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().toInteger()))) + .onComplete(result -> subscribeSent(result, topics)); + } + + /** + * Request to unsubscribe from the server. + * + * @param topics The topic to unsubscribe from. + */ + private void requestUnsubscribe(List topics) { + if (topics.isEmpty() || this.client == null) { + // nothing to do + return; + } + + for (String topic : topics) { + // vertx-mqtt currently does not support unsubscribing from multi-topics due to an API limitation + this.client + .unsubscribe(topic) + .onComplete(result -> unsubscribeSent(result, Collections.singletonList(topic))); + } + } + + /** + * Called when the subscribe call was sent. + * + * @param result The result of sending the request, contains the packet id. + */ + private void subscribeSent(AsyncResult result, LinkedHashMap topics) { + if (result.failed() || result.result() == null) { + // failed + for (String topic : topics.keySet()) { + notifySubscriptionState(topic, SubscriptionState.UNSUBSCRIBED, null); + } + } else { + // record request + for (String topic : topics.keySet()) { + notifySubscriptionState(topic, SubscriptionState.SUBSCRIBING, null); + } + this.pendingSubscribes.put(result.result(), topics); + } + } + + /** + * Called when the unsubscribe call was sent. + * + * @param result The result of sending the request, contains the packet id. + */ + private void unsubscribeSent(AsyncResult result, List topics) { + if (result.failed() || result.result() == null) { + closeConnection(String.format("Failed to send unsubscribe request: %s", result.cause())); + } else { + this.pendingUnsubscribes.put(result.result(), topics); + } + } + + /** + * Called when the server processed the request to subscribe. + * + * @param ack The acknowledge message. + */ + private void subscribeCompleted(MqttSubAckMessage ack) { + LinkedHashMap request = this.pendingSubscribes.remove(ack.messageId()); + if (request == null) { + closeConnection(String.format("Unexpected subscription ack response - messageId: %s", ack.messageId())); + return; + } + if (request.size() != ack.grantedQoSLevels().size()) { + closeConnection(String.format("Mismatch of topics on subscription ack - expected: %d, actual: %d", request.size(), + ack.grantedQoSLevels().size())); + return; + } + + int idx = 0; + for (String topic : request.keySet()) { + Integer grantedQoS = ack.grantedQoSLevels().get(idx); + notifySubscriptionState(topic, SubscriptionState.SUBSCRIBED, grantedQoS); + idx += 1; + } + } + + /** + * Called when the server processed the request to unsubscribe. + * + * @param messageId The ID of the message that completed. + */ + private void unsubscribeCompleted(Integer messageId) { + List request = this.pendingUnsubscribes.remove(messageId); + for (String topic : request) { + notifySubscriptionState(topic, SubscriptionState.UNSUBSCRIBED, null); + } + } + + @Override + public Future publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) { + Promise future = Promise.promise(); + this.vertx + .runOnContext(x -> doPublish(topic, payload, qosLevel, isDup, isRetain) + .onComplete(future)); + return future.future(); + } + + private Future doPublish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) { + if (this.client != null) { + return this.client.publish(topic, payload, qosLevel, isDup, isRetain); + } else { + return Future.failedFuture("Session is not connected"); + } + } + + private void publishComplete(Integer messageId) { + Handler handler = this.publishCompleteHandler; + if (handler != null) { + handler.handle(messageId); + } + } + + private void publishExpired(Integer messageId) { + Handler handler = this.publishCompletionExpirationHandler; + if (handler != null) { + handler.handle(messageId); + } + } + + private void publishCompletionUnknown(Integer messageId) { + Handler handler = this.publishCompletionUnknownPacketIdHandler; + if (handler != null) { + handler.handle(messageId); + } + } + +} diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SessionEventImpl.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SessionEventImpl.java new file mode 100644 index 0000000000..41dfc6ed50 --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SessionEventImpl.java @@ -0,0 +1,38 @@ +package io.smallrye.reactive.messaging.mqtt.session.impl; + +import io.smallrye.reactive.messaging.mqtt.session.SessionEvent; +import io.smallrye.reactive.messaging.mqtt.session.SessionState; + +/** + * An event of a session state change. + */ +public class SessionEventImpl implements SessionEvent { + + private final SessionState sessionState; + private final Throwable cause; + + public SessionEventImpl(final SessionState sessionState, final Throwable reason) { + this.sessionState = sessionState; + this.cause = reason; + } + + /** + * The new state of the session. + * + * @return The state. + */ + @Override + public SessionState getSessionState() { + return this.sessionState; + } + + /** + * The (optional) cause of change. + * + * @return The throwable that causes the state change, or {@code null}, if there was none. + */ + @Override + public Throwable getCause() { + return this.cause; + } +} diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SubscriptionEventImpl.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SubscriptionEventImpl.java new file mode 100644 index 0000000000..e3b8ba09d1 --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SubscriptionEventImpl.java @@ -0,0 +1,77 @@ +package io.smallrye.reactive.messaging.mqtt.session.impl; + +import java.util.Objects; +import java.util.StringJoiner; + +import io.smallrye.reactive.messaging.mqtt.session.SubscriptionEvent; +import io.smallrye.reactive.messaging.mqtt.session.SubscriptionState; + +/** + * An event of a subscription state change. + */ +public class SubscriptionEventImpl implements SubscriptionEvent { + private final String topic; + private final SubscriptionState subscriptionState; + private final Integer qos; + + public SubscriptionEventImpl(final String topic, final SubscriptionState subscriptionState, final Integer qos) { + this.topic = topic; + this.subscriptionState = subscriptionState; + this.qos = qos; + } + + /** + * The granted QoS level from the server. + * + * @return When the state changed to {@link SubscriptionState#SUBSCRIBED}, it contains the QoS level granted by + * the server. Otherwise it will be {@code null}. + */ + @Override + public Integer getQos() { + return this.qos; + } + + /** + * The new subscription state. + * + * @return The state. + */ + @Override + public SubscriptionState getSubscriptionState() { + return this.subscriptionState; + } + + /** + * The name of the topic this change refers to. + * + * @return The topic name. + */ + @Override + public String getTopic() { + return this.topic; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + SubscriptionEventImpl that = (SubscriptionEventImpl) o; + return topic.equals(that.topic) && subscriptionState == that.subscriptionState && Objects.equals(qos, that.qos); + } + + @Override + public int hashCode() { + return Objects.hash(topic, subscriptionState, qos); + } + + @Override + public String toString() { + return new StringJoiner(", ", SubscriptionEventImpl.class.getSimpleName() + "[", "]") + .add("topic='" + topic + "'") + .add("subscriptionState=" + subscriptionState) + .add("qos=" + qos) + .toString(); + } +} diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSourceTest.java index 713d72930f..3b40abe71b 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSourceTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSourceTest.java @@ -43,7 +43,7 @@ public void testSource() { List> messages = new ArrayList<>(); PublisherBuilder> stream = source.getSource(); stream.forEach(messages::add).run(); - await().until(source::isSubscribed); + await().until(source::isReady); AtomicInteger counter = new AtomicInteger(); new Thread(() -> usage.produceIntegers(topic, 10, null, counter::getAndIncrement)).start(); @@ -69,7 +69,7 @@ public void testSourceUsingChannelName() { List> messages = new ArrayList<>(); PublisherBuilder> stream = source.getSource(); stream.forEach(messages::add).run(); - await().until(source::isSubscribed); + await().until(source::isReady); AtomicInteger counter = new AtomicInteger(); new Thread(() -> usage.produceIntegers(topic, 10, null, counter::getAndIncrement)).start(); @@ -101,7 +101,7 @@ public void testBroadcast() { stream.forEach(messages1::add).run(); stream.forEach(messages2::add).run(); - await().until(source::isSubscribed); + await().until(source::isReady); AtomicInteger counter = new AtomicInteger(); new Thread(() -> usage.produceIntegers(topic, 10, null, @@ -142,7 +142,7 @@ public void testWithVeryLargeMessage() { List> messages = new ArrayList<>(); PublisherBuilder> stream = source.getSource(); stream.forEach(messages::add).run(); - await().until(source::isSubscribed); + await().until(source::isReady); new Thread(() -> usage.produce(topic, 10, null, () -> large)).start(); diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttSourceTest.java index f3125f252e..f3774ca6f9 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttSourceTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttSourceTest.java @@ -52,7 +52,7 @@ public void testMutualTLS() { List> messages = new ArrayList<>(); PublisherBuilder> stream = source.getSource(); stream.forEach(messages::add).run(); - await().until(source::isSubscribed); + await().until(source::isReady); pause(); AtomicInteger counter = new AtomicInteger(); new Thread(() -> usage.produceIntegers(topic, 10, null, diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttSourceTest.java index 3af0a1cb17..b9300fc46f 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttSourceTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttSourceTest.java @@ -45,7 +45,7 @@ public void testSecureSource() { List> messages = new ArrayList<>(); PublisherBuilder> stream = source.getSource(); stream.forEach(messages::add).run(); - await().until(source::isSubscribed); + await().until(source::isReady); pause(); AtomicInteger counter = new AtomicInteger(); new Thread(() -> usage.produceIntegers(topic, 10, null, diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttSourceTest.java index cbeaf564cd..84a835f578 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttSourceTest.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttSourceTest.java @@ -48,7 +48,7 @@ public void testTLS() { List> messages = new ArrayList<>(); PublisherBuilder> stream = source.getSource(); stream.forEach(messages::add).run(); - await().until(source::isSubscribed); + await().until(source::isReady); pause(); AtomicInteger counter = new AtomicInteger(); new Thread(() -> usage.produceIntegers(topic, 10, null,