Skip to content

Commit

Permalink
Fix broken MQTT client by implementing a reconnect "session" on top o…
Browse files Browse the repository at this point in the history
…f the client

This is based on the content of PR vert-x3/vertx-mqtt#197
  • Loading branch information
ctron committed May 27, 2021
1 parent 6b90711 commit b1d89ee
Show file tree
Hide file tree
Showing 24 changed files with 1,498 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import java.util.concurrent.ConcurrentHashMap;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.vertx.mqtt.MqttClientOptions;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.mqtt.session.SessionState;
import io.smallrye.reactive.messaging.mqtt.session.SubscriptionEvent;
import io.smallrye.reactive.messaging.mqtt.session.SubscriptionState;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.mqtt.MqttClient;
import io.vertx.mutiny.mqtt.messages.MqttConnAckMessage;
import io.vertx.mutiny.mqtt.messages.MqttPublishMessage;

public class Clients {
Expand All @@ -20,24 +21,13 @@ private Clients() {
// avoid direct instantiation.
}

static Uni<MqttClient> getConnectedClient(Vertx vertx, String host, int port, String server,
MqttClientOptions options) {
String id = host + port + "<" + (server == null ? "" : server)
+ ">-[" + (options.getClientId() != null ? options.getClientId() : "") + "]";
ClientHolder holder = clients.computeIfAbsent(id, key -> {
MqttClient client = MqttClient.create(vertx, options);
return new ClientHolder(client, host, port, server);
});
return holder.connect();
}

static ClientHolder getHolder(Vertx vertx, String host, int port, String server,
MqttClientOptions options) {
MqttClientSessionOptions options) {

String id = host + port + "<" + (server == null ? "" : server)
+ ">-[" + (options.getClientId() != null ? options.getClientId() : "") + "]";
return clients.computeIfAbsent(id, key -> {
MqttClient client = MqttClient.create(vertx, options);
MqttClientSession client = MqttClientSession.create(vertx.getDelegate(), options);
return new ClientHolder(client, host, port, server);
});
}
Expand All @@ -52,33 +42,53 @@ public static void clear() {

public static class ClientHolder {

private final MqttClient client;
private final Uni<MqttConnAckMessage> connection;
private final MqttClientSession client;
private final BroadcastProcessor<MqttPublishMessage> messages;
private final BroadcastProcessor<SessionState> sessionState;
private final BroadcastProcessor<SubscriptionEvent> subscriptionState;

public ClientHolder(MqttClient client, String host, int port, String server) {
public ClientHolder(MqttClientSession client, String host, int port, String server) {
this.client = client;
this.connection = client.connect(port, host, server).memoize().indefinitely();
messages = BroadcastProcessor.create();
client.publishHandler(messages::onNext);
client.closeHandler(messages::onComplete);
sessionState = BroadcastProcessor.create();
subscriptionState = BroadcastProcessor.create();
client.messageHandler(m -> messages.onNext(MqttPublishMessage.newInstance(m)));
client.exceptionHandler(messages::onError);
client.sessionStateHandler(evt -> sessionState.onNext(evt.getSessionState()));
client.subscriptionStateHandler(subscriptionState::onNext);
}

public Uni<MqttClient> connect() {
return connection
.map(ignored -> client);
public void start() {
client.start();
}

public void close() {
if (client.isConnected()) {
client.disconnectAndAwait();
}
client.stop();
}

public Multi<SessionState> sessionState() {
return Multi.createFrom()
.item(this.client.getState())
.onCompletion()
.switchTo(this.sessionState);
}

public Multi<SubscriptionState> subscriptionState(String topic) {
return Multi.createFrom()
.item(this.client.getSubscriptionState(topic))
.onCompletion()
.switchTo(this.subscriptionState
.filter(evt -> evt.getTopic().equals(topic))
.map(SubscriptionEvent::getSubscriptionState));
}

public Multi<MqttPublishMessage> stream() {
return messages;
}

public MqttClientSession getClient() {
return client;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
@ConnectorAttribute(name = "will-retain", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Set if the will message must be retained", defaultValue = "false")
@ConnectorAttribute(name = "will-qos", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set the QoS level for the will message", defaultValue = "0")
@ConnectorAttribute(name = "max-message-size", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set max MQTT message size in bytes", defaultValue = "8092")
@ConnectorAttribute(name = "reconnect-attempts", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set the max reconnect attempts", defaultValue = "5")
@ConnectorAttribute(name = "reconnect-interval-seconds", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set the reconnect interval in seconds", defaultValue = "1")
@ConnectorAttribute(name = "username", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the username to connect to the server")
@ConnectorAttribute(name = "password", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the password to connect to the server")
Expand Down Expand Up @@ -99,7 +98,7 @@ public boolean isReady() {
public boolean isSourceReady() {
boolean ready = true;
for (MqttSource source : sources) {
ready = ready && source.isSubscribed();
ready = ready && source.isReady();
}
return ready;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.TrustOptions;
import io.vertx.mqtt.MqttClientOptions;

public class MqttHelpers {

private MqttHelpers() {
// avoid direct instantiation.
}

static MqttClientOptions createMqttClientOptions(MqttConnectorCommonConfiguration config) {
MqttClientOptions options = new MqttClientOptions();
static MqttClientSessionOptions createMqttClientOptions(MqttConnectorCommonConfiguration config) {
MqttClientSessionOptions options = new MqttClientSessionOptions();
options.setCleanSession(config.getAutoCleanSession());
options.setAutoGeneratedClientId(config.getAutoGeneratedClientId());
options.setAutoKeepAlive(config.getAutoKeepAlive());
Expand All @@ -28,7 +28,6 @@ static MqttClientOptions createMqttClientOptions(MqttConnectorCommonConfiguratio
options.setMaxInflightQueue(config.getMaxInflightQueue());
options.setMaxMessageSize(config.getMaxMessageSize());
options.setPassword(config.getPassword().orElse(null));
options.setReconnectAttempts(config.getReconnectAttempts());
options.setReconnectInterval(TimeUnit.SECONDS.toMillis(config.getReconnectIntervalSeconds()));
options.setSsl(config.getSsl());
options.setKeyCertOptions(getKeyCertOptions(config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@

import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.AsyncResultUni;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.mqtt.session.SessionState;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import io.vertx.mutiny.mqtt.MqttClient;

public class MqttSink {

Expand All @@ -30,77 +32,71 @@ public class MqttSink {
private final int qos;

private final SubscriberBuilder<? extends Message<?>, Void> sink;
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean ready = new AtomicBoolean();

public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config) {
MqttClientOptions options = MqttHelpers.createMqttClientOptions(config);
MqttClientSessionOptions options = MqttHelpers.createMqttClientOptions(config);
host = config.getHost();
int def = options.isSsl() ? 8883 : 1883;
port = config.getPort().orElse(def);
server = config.getServerName().orElse(null);
topic = config.getTopic().orElseGet(config::getChannel);
qos = config.getQos();

AtomicReference<MqttClient> reference = new AtomicReference<>();
AtomicReference<Clients.ClientHolder> reference = new AtomicReference<>();
sink = ReactiveStreams.<Message<?>> builder()
.flatMapCompletionStage(msg -> {
MqttClient client = reference.get();
if (client != null) {
if (client.isConnected()) {
connected.set(true);
return CompletableFuture.completedFuture(msg);
} else {
CompletableFuture<Message<?>> future = new CompletableFuture<>();
vertx.setPeriodic(100, id -> {
if (client.isConnected()) {
vertx.cancelTimer(id);
connected.set(true);
future.complete(msg);
}
});
return future;
}
} else {
return Clients.getConnectedClient(vertx, host, port, server, options)
.map(c -> {
reference.set(c);
connected.set(true);
return msg;
})
.subscribeAsCompletionStage();
Clients.ClientHolder client = reference.get();
if (client == null) {
client = Clients.getHolder(vertx, host, port, server, options);
// FIXME: add session state listener
client.start();
reference.set(client);
}

return client.sessionState()
.filter(state -> state != SessionState.CONNECTED)
.map(ignore -> msg).toUni().subscribeAsCompletionStage();

})
.flatMapCompletionStage(msg -> send(reference, msg))
.onComplete(() -> {
MqttClient c = reference.getAndSet(null);
Clients.ClientHolder c = reference.getAndSet(null);
if (c != null) {
connected.set(false);
c.disconnectAndForget();
ready.set(false);
c.close();
}
})
.onError(log::errorWhileSendingMessageToBroker)
.ignore();
}

private CompletionStage<?> send(AtomicReference<MqttClient> reference, Message<?> msg) {
MqttClient client = reference.get();
String actualTopicToBeUsed = this.topic;
MqttQoS actualQoS = MqttQoS.valueOf(this.qos);
boolean isRetain = false;
private CompletionStage<?> send(AtomicReference<Clients.ClientHolder> reference, Message<?> msg) {
MqttClientSession client = reference.get().getClient();
final String actualTopicToBeUsed;
final MqttQoS actualQoS;
final boolean isRetain;

if (msg instanceof SendingMqttMessage) {
MqttMessage<?> mm = ((SendingMqttMessage<?>) msg);
actualTopicToBeUsed = mm.getTopic() == null ? topic : mm.getTopic();
actualQoS = mm.getQosLevel() == null ? actualQoS : mm.getQosLevel();
actualTopicToBeUsed = mm.getTopic() == null ? this.topic : mm.getTopic();
actualQoS = mm.getQosLevel() == null ? MqttQoS.valueOf(this.qos) : mm.getQosLevel();
isRetain = mm.isRetain();
} else {
actualTopicToBeUsed = this.topic;
isRetain = false;
actualQoS = MqttQoS.valueOf(this.qos);
}

if (actualTopicToBeUsed == null) {
log.ignoringNoTopicSet();
return CompletableFuture.completedFuture(msg);
}

return client.publish(actualTopicToBeUsed, convert(msg.getPayload()), actualQoS, false, isRetain)
return AsyncResultUni
.<Integer> toUni(h -> client
.publish(actualTopicToBeUsed, convert(msg.getPayload()).getDelegate(), actualQoS, false, isRetain)
.onComplete(h))
.onItemOrFailure().transformToUni((s, f) -> {
if (f != null) {
return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg));
Expand Down Expand Up @@ -139,6 +135,6 @@ public SubscriberBuilder<? extends Message<?>, Void> getSink() {
}

public boolean isReady() {
return connected.get();
return ready.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

import io.vertx.mqtt.MqttClientOptions;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.mqtt.session.RequestedQoS;
import io.smallrye.reactive.messaging.mqtt.session.SubscriptionState;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.mqtt.messages.MqttPublishMessage;

public class MqttSource {

private final PublisherBuilder<MqttMessage<?>> source;
private final AtomicBoolean subscribed = new AtomicBoolean();
private final AtomicBoolean ready = new AtomicBoolean();
private final Pattern pattern;

public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config) {
MqttClientOptions options = MqttHelpers.createMqttClientOptions(config);
MqttClientSessionOptions options = MqttHelpers.createMqttClientOptions(config);

String host = config.getHost();
int def = options.isSsl() ? 8883 : 1883;
Expand All @@ -41,22 +43,24 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config) {
}

Clients.ClientHolder holder = Clients.getHolder(vertx, host, port, server, options);
holder.subscriptionState(topic)
.onItem().invoke(state -> {
ready.set(state == SubscriptionState.SUBSCRIBED);
});
holder.start();
holder.getClient()
.subscribe(topic, RequestedQoS.valueOf(qos));
this.source = ReactiveStreams.fromPublisher(
holder.connect()
.onItem().transformToMulti(client -> client.subscribe(topic, qos)
.onItem().transformToMulti(x -> {
subscribed.set(true);
return holder.stream()
.select().where(m -> matches(topic, m))
.onItem().transform(m -> new ReceivingMqttMessage(m, onNack));
}))
holder.stream()
.select().where(m -> matches(topic, m))
.onItem().transform(m -> new ReceivingMqttMessage(m, onNack))
.stage(multi -> {
if (broadcast) {
return multi.broadcast().toAllSubscribers();
}
return multi;
})
.onCancellation().invoke(() -> subscribed.set(false))
.onCancellation().invoke(() -> holder.getClient().unsubscribe(topic))
.onFailure().invoke(log::unableToConnectToBroker));
}

Expand All @@ -82,7 +86,8 @@ PublisherBuilder<MqttMessage<?>> getSource() {
return source;
}

boolean isSubscribed() {
return subscribed.get();
public boolean isReady() {
return ready.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ public interface MqttExceptions {
@Message(id = 17000, value = "Unknown failure strategy: %s")
IllegalArgumentException illegalArgumentUnknownStrategy(String strategy);

@Message(id = 17001, value = "Invalid QoS value: %s")
IllegalArgumentException illegalArgumentInvalidQoS(int qos);

}
Loading

0 comments on commit b1d89ee

Please sign in to comment.