Skip to content

Commit

Permalink
[smallrye#1181] Fix broken MQTT client by implementing a reconnect "s…
Browse files Browse the repository at this point in the history
…ession"

This is based on the content of PR
vert-x3/vertx-mqtt#197
  • Loading branch information
ctron committed May 27, 2021
1 parent 6b90711 commit 566bc7d
Show file tree
Hide file tree
Showing 26 changed files with 1,548 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ Type: _int_ | false | `0`

Type: _int_ | false | `8092`

| *reconnect-attempts* | Set the max reconnect attempts

Type: _int_ | false | `5`

| *reconnect-interval-seconds* | Set the reconnect interval in seconds

Type: _int_ | false | `1`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ Type: _int_ | false |

Type: _int_ | false | `0`

| *reconnect-attempts* | Set the max reconnect attempts

Type: _int_ | false | `5`

| *reconnect-interval-seconds* | Set the reconnect interval in seconds

Type: _int_ | false | `1`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import java.util.concurrent.ConcurrentHashMap;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.vertx.mqtt.MqttClientOptions;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.mqtt.session.SessionState;
import io.smallrye.reactive.messaging.mqtt.session.SubscriptionEvent;
import io.smallrye.reactive.messaging.mqtt.session.SubscriptionState;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.mqtt.MqttClient;
import io.vertx.mutiny.mqtt.messages.MqttConnAckMessage;
import io.vertx.mutiny.mqtt.messages.MqttPublishMessage;

public class Clients {
Expand All @@ -20,25 +21,23 @@ private Clients() {
// avoid direct instantiation.
}

static Uni<MqttClient> getConnectedClient(Vertx vertx, String host, int port, String server,
MqttClientOptions options) {
String id = host + port + "<" + (server == null ? "" : server)
+ ">-[" + (options.getClientId() != null ? options.getClientId() : "") + "]";
ClientHolder holder = clients.computeIfAbsent(id, key -> {
MqttClient client = MqttClient.create(vertx, options);
return new ClientHolder(client, host, port, server);
});
return holder.connect();
}
static ClientHolder getHolder(Vertx vertx, MqttClientSessionOptions options) {

static ClientHolder getHolder(Vertx vertx, String host, int port, String server,
MqttClientOptions options) {
String host = options.getHostname();
int port = options.getPort();
String clientId = options.getClientId();
String server = options.getServerName().orElse(null);
String username = options.getUsername();
String password = options.getPassword();

String id = host + port + "<" + (server == null ? "" : server)
+ ">-[" + (options.getClientId() != null ? options.getClientId() : "") + "]";
String id = username + ":" + password + "@"
+ host + ":"
+ port
+ "<" + (server == null ? "" : server)
+ ">-[" + (clientId != null ? clientId : "") + "]";
return clients.computeIfAbsent(id, key -> {
MqttClient client = MqttClient.create(vertx, options);
return new ClientHolder(client, host, port, server);
MqttClientSession client = MqttClientSession.create(vertx.getDelegate(), options);
return new ClientHolder(client);
});
}

Expand All @@ -52,33 +51,53 @@ public static void clear() {

public static class ClientHolder {

private final MqttClient client;
private final Uni<MqttConnAckMessage> connection;
private final MqttClientSession client;
private final BroadcastProcessor<MqttPublishMessage> messages;
private final BroadcastProcessor<SessionState> sessionState;
private final BroadcastProcessor<SubscriptionEvent> subscriptionState;

public ClientHolder(MqttClient client, String host, int port, String server) {
public ClientHolder(MqttClientSession client) {
this.client = client;
this.connection = client.connect(port, host, server).memoize().indefinitely();
messages = BroadcastProcessor.create();
client.publishHandler(messages::onNext);
client.closeHandler(messages::onComplete);
sessionState = BroadcastProcessor.create();
subscriptionState = BroadcastProcessor.create();
client.messageHandler(m -> messages.onNext(MqttPublishMessage.newInstance(m)));
client.exceptionHandler(messages::onError);
client.sessionStateHandler(evt -> sessionState.onNext(evt.getSessionState()));
client.subscriptionStateHandler(subscriptionState::onNext);
}

public Uni<MqttClient> connect() {
return connection
.map(ignored -> client);
public void start() {
client.start();
}

public void close() {
if (client.isConnected()) {
client.disconnectAndAwait();
}
client.stop();
}

public Multi<SessionState> sessionState() {
return Multi.createFrom()
.item(this.client.getState())
.onCompletion()
.switchTo(this.sessionState);
}

public Multi<SubscriptionState> subscriptionState(String topic) {
return Multi.createFrom()
.item(this.client.getSubscriptionState(topic))
.onCompletion()
.switchTo(this.subscriptionState
.filter(evt -> evt.getTopic().equals(topic))
.map(SubscriptionEvent::getSubscriptionState));
}

public Multi<MqttPublishMessage> stream() {
return messages;
}

public MqttClientSession getClient() {
return client;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
@ConnectorAttribute(name = "will-retain", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Set if the will message must be retained", defaultValue = "false")
@ConnectorAttribute(name = "will-qos", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set the QoS level for the will message", defaultValue = "0")
@ConnectorAttribute(name = "max-message-size", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set max MQTT message size in bytes", defaultValue = "8092")
@ConnectorAttribute(name = "reconnect-attempts", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set the max reconnect attempts", defaultValue = "5")
@ConnectorAttribute(name = "reconnect-interval-seconds", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set the reconnect interval in seconds", defaultValue = "1")
@ConnectorAttribute(name = "username", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the username to connect to the server")
@ConnectorAttribute(name = "password", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the password to connect to the server")
Expand Down Expand Up @@ -99,7 +98,7 @@ public boolean isReady() {
public boolean isSourceReady() {
boolean ready = true;
for (MqttSource source : sources) {
ready = ready && source.isSubscribed();
ready = ready && source.isReady();
}
return ready;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,41 @@
package io.smallrye.reactive.messaging.mqtt;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import io.smallrye.reactive.messaging.mqtt.session.ConstantReconnectDelayOptions;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.mqtt.session.ReconnectDelayOptions;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.TrustOptions;
import io.vertx.mqtt.MqttClientOptions;

public class MqttHelpers {

private MqttHelpers() {
// avoid direct instantiation.
}

static MqttClientOptions createMqttClientOptions(MqttConnectorCommonConfiguration config) {
MqttClientOptions options = new MqttClientOptions();
static MqttClientSessionOptions createMqttClientOptions(MqttConnectorCommonConfiguration config) {
MqttClientSessionOptions options = new MqttClientSessionOptions();
options.setCleanSession(config.getAutoCleanSession());
options.setAutoGeneratedClientId(config.getAutoGeneratedClientId());
options.setAutoKeepAlive(config.getAutoKeepAlive());
options.setClientId(config.getClientId().orElse(null));
options.setConnectTimeout(config.getConnectTimeoutSeconds());
options.setHostname(config.getHost());
options.setKeepAliveInterval(config.getKeepAliveSeconds());
options.setMaxInflightQueue(config.getMaxInflightQueue());
options.setMaxMessageSize(config.getMaxMessageSize());
options.setPassword(config.getPassword().orElse(null));
options.setReconnectAttempts(config.getReconnectAttempts());
options.setReconnectInterval(TimeUnit.SECONDS.toMillis(config.getReconnectIntervalSeconds()));
options.setPort(config.getPort().orElseGet(() -> config.getSsl() ? 8883 : 1883));
options.setReconnectDelay(getReconnectDelayOptions(config));
options.setSsl(config.getSsl());
options.setKeyCertOptions(getKeyCertOptions(config));
options.setServerName(config.getServerName());
options.setTrustOptions(getTrustOptions(config));
options.setTrustAll(config.getTrustAll());
options.setUsername(config.getUsername().orElse(null));
Expand Down Expand Up @@ -122,4 +126,10 @@ private static TrustOptions getTrustOptions(MqttConnectorCommonConfiguration con
return null;
}

private static ReconnectDelayOptions getReconnectDelayOptions(MqttConnectorCommonConfiguration config) {
ConstantReconnectDelayOptions options = new ConstantReconnectDelayOptions();
options.setDelay(Duration.ofSeconds(config.getReconnectIntervalSeconds()));
return options;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,94 +13,83 @@

import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.AsyncResultUni;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.mqtt.session.SessionState;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import io.vertx.mutiny.mqtt.MqttClient;

public class MqttSink {

private final String host;
private final int port;
private final String server;
private final String topic;
private final int qos;

private final SubscriberBuilder<? extends Message<?>, Void> sink;
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean ready = new AtomicBoolean();

public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config) {
MqttClientOptions options = MqttHelpers.createMqttClientOptions(config);
host = config.getHost();
int def = options.isSsl() ? 8883 : 1883;
port = config.getPort().orElse(def);
server = config.getServerName().orElse(null);
MqttClientSessionOptions options = MqttHelpers.createMqttClientOptions(config);
topic = config.getTopic().orElseGet(config::getChannel);
qos = config.getQos();

AtomicReference<MqttClient> reference = new AtomicReference<>();
AtomicReference<Clients.ClientHolder> reference = new AtomicReference<>();
sink = ReactiveStreams.<Message<?>> builder()
.flatMapCompletionStage(msg -> {
MqttClient client = reference.get();
if (client != null) {
if (client.isConnected()) {
connected.set(true);
return CompletableFuture.completedFuture(msg);
} else {
CompletableFuture<Message<?>> future = new CompletableFuture<>();
vertx.setPeriodic(100, id -> {
if (client.isConnected()) {
vertx.cancelTimer(id);
connected.set(true);
future.complete(msg);
}
});
return future;
}
} else {
return Clients.getConnectedClient(vertx, host, port, server, options)
.map(c -> {
reference.set(c);
connected.set(true);
return msg;
})
.subscribeAsCompletionStage();
Clients.ClientHolder client = reference.get();
if (client == null) {
client = Clients.getHolder(vertx, options);
// FIXME: add session state listener
client.start();
reference.set(client);
}

return client.sessionState()
.filter(state -> state != SessionState.CONNECTED)
.map(ignore -> msg).toUni().subscribeAsCompletionStage();

})
.flatMapCompletionStage(msg -> send(reference, msg))
.onComplete(() -> {
MqttClient c = reference.getAndSet(null);
Clients.ClientHolder c = reference.getAndSet(null);
if (c != null) {
connected.set(false);
c.disconnectAndForget();
ready.set(false);
c.close();
}
})
.onError(log::errorWhileSendingMessageToBroker)
.ignore();
}

private CompletionStage<?> send(AtomicReference<MqttClient> reference, Message<?> msg) {
MqttClient client = reference.get();
String actualTopicToBeUsed = this.topic;
MqttQoS actualQoS = MqttQoS.valueOf(this.qos);
boolean isRetain = false;
private CompletionStage<?> send(AtomicReference<Clients.ClientHolder> reference, Message<?> msg) {
MqttClientSession client = reference.get().getClient();
final String actualTopicToBeUsed;
final MqttQoS actualQoS;
final boolean isRetain;

if (msg instanceof SendingMqttMessage) {
MqttMessage<?> mm = ((SendingMqttMessage<?>) msg);
actualTopicToBeUsed = mm.getTopic() == null ? topic : mm.getTopic();
actualQoS = mm.getQosLevel() == null ? actualQoS : mm.getQosLevel();
actualTopicToBeUsed = mm.getTopic() == null ? this.topic : mm.getTopic();
actualQoS = mm.getQosLevel() == null ? MqttQoS.valueOf(this.qos) : mm.getQosLevel();
isRetain = mm.isRetain();
} else {
actualTopicToBeUsed = this.topic;
isRetain = false;
actualQoS = MqttQoS.valueOf(this.qos);
}

if (actualTopicToBeUsed == null) {
log.ignoringNoTopicSet();
return CompletableFuture.completedFuture(msg);
}

return client.publish(actualTopicToBeUsed, convert(msg.getPayload()), actualQoS, false, isRetain)
return AsyncResultUni
.<Integer> toUni(h -> client
.publish(actualTopicToBeUsed, convert(msg.getPayload()).getDelegate(), actualQoS, false, isRetain)
.onComplete(h))
.onItemOrFailure().transformToUni((s, f) -> {
if (f != null) {
return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg));
Expand Down Expand Up @@ -139,6 +128,6 @@ public SubscriberBuilder<? extends Message<?>, Void> getSink() {
}

public boolean isReady() {
return connected.get();
return ready.get();
}
}
Loading

0 comments on commit 566bc7d

Please sign in to comment.