From d22484d52eb46fe797a4dfea131588010d13dd30 Mon Sep 17 00:00:00 2001 From: Marcello Martina Date: Fri, 2 Feb 2024 12:23:38 +0100 Subject: [PATCH] refactor(sparkplug): applies State pattern to SparkplugMqttClient, fixed onConnectionLost status reset Signed-off-by: Marcello Martina --- .../transport/SparkplugDataTransport.java | 2 +- .../mqtt/transport/SparkplugMqttClient.java | 359 +++++++++++------- 2 files changed, 224 insertions(+), 137 deletions(-) diff --git a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugDataTransport.java b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugDataTransport.java index 1c71d347646..cd7c9423d8d 100644 --- a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugDataTransport.java +++ b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugDataTransport.java @@ -230,8 +230,8 @@ public void removeDataTransportListener(DataTransportListener listener) { @Override public void connectionLost(Throwable arg0) { logger.info("{} - Connection lost", this.kuraServicePid); + this.client.handleConnectionLost(); this.dataTransportListeners.forEach(listener -> callSafely(listener::onConnectionLost, arg0)); - } @Override diff --git a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugMqttClient.java b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugMqttClient.java index 1a255891954..12349576f9c 100644 --- a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugMqttClient.java +++ b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugMqttClient.java @@ -67,73 +67,48 @@ public class SparkplugMqttClient { private SslManagerService sslManagerService; - public enum SessionStatus { - TERMINATED, - ESTABILISHING, - ESTABILISHED - } + private SessionStatus sessionStatus = new Terminated(); - private SessionStatus sessionStatus = SessionStatus.TERMINATED; + /* + * State management + */ - public SparkplugMqttClient(SparkplugDataTransportOptions options, MqttCallback callback, - Set listeners, SslManagerService sslManagerService) { - this.servers = options.getServers(); - this.serversIterator = this.servers.iterator(); - this.clientId = options.getClientId(); - this.options = options.getMqttConnectOptions(); - this.callback = callback; - this.listeners = listeners; + private abstract class SessionStatus { - this.groupId = options.getGroupId(); - this.nodeId = options.getNodeId(); - this.primaryHostId = options.getPrimaryHostApplicationId(); - this.connectionTimeoutMs = options.getConnectionTimeoutMs(); + public abstract SessionStatus estabilishSession(boolean shouldConnectClient) throws KuraConnectException; - this.sslManagerService = sslManagerService; + public abstract SessionStatus terminateSession(boolean shouldDisconnectClient, long quiesceTimeout); - logger.info( - "Sparkplug MQTT client updated" + "\n\tServers: {}" + "\n\tClient ID: {}" + "\n\tGroup ID: {}" - + "\n\tNode ID: {}" + "\n\tPrimary Host Application ID: {}" + "\n\tConnection Timeout (ms): {}", - this.servers, this.clientId, this.groupId, this.nodeId, this.primaryHostId, this.connectionTimeoutMs); - } - - public synchronized boolean isSessionEstabilished() { - return this.sessionStatus == SessionStatus.ESTABILISHED && Objects.nonNull(this.client) - && this.client.isConnected(); - } + public abstract SessionStatus confirmSession(); - public synchronized void estabilishSession(boolean shouldConnectClient) throws KuraConnectException { - if (this.sessionStatus == SessionStatus.TERMINATED) { + SessionStatus toEstabilishing(boolean shouldConnectClient) throws KuraConnectException { try { - updateSessionStatus(SessionStatus.TERMINATED, SessionStatus.ESTABILISHING); - if (shouldConnectClient) { newClientConnection(); } - subscribe(SparkplugTopics.getNodeCommandTopic(this.groupId, this.nodeId), 1); + subscribe(SparkplugTopics.getNodeCommandTopic(SparkplugMqttClient.this.groupId, + SparkplugMqttClient.this.nodeId), 1); - if (this.primaryHostId.isPresent()) { - subscribe(SparkplugTopics.getStateTopic(this.primaryHostId.get()), 1); + if (SparkplugMqttClient.this.primaryHostId.isPresent()) { + subscribe(SparkplugTopics.getStateTopic(SparkplugMqttClient.this.primaryHostId.get()), 1); } else { - confirmSession(); + return toEstabilished(); } } catch (MqttException | GeneralSecurityException | IOException e) { - this.sessionStatus = SessionStatus.TERMINATED; - this.bdSeqCounter = new BdSeqCounter(); + SparkplugMqttClient.this.bdSeqCounter = new BdSeqCounter(); throw new KuraConnectException(e); } - } else { - logInvalidStateTransition(this.sessionStatus, SessionStatus.ESTABILISHING); + + return new Estabilishing(); } - } - public synchronized void terminateSession(boolean shouldDisconnectClient, long quiesceTimeout) { - if (this.sessionStatus == SessionStatus.ESTABILISHED || this.sessionStatus == SessionStatus.ESTABILISHING) { + SessionStatus toTerminated(boolean shouldDisconnectClient, long quiesceTimeout) { try { - this.listeners.forEach(listener -> SparkplugDataTransport.callSafely(listener::onDisconnecting)); + SparkplugMqttClient.this.listeners + .forEach(listener -> SparkplugDataTransport.callSafely(listener::onDisconnecting)); - if (this.sessionStatus == SessionStatus.ESTABILISHED) { + if (SparkplugMqttClient.this.sessionStatus instanceof Estabilished) { sendEdgeNodeDeath(); } @@ -141,25 +116,210 @@ public synchronized void terminateSession(boolean shouldDisconnectClient, long q disconnectClient(quiesceTimeout); } - updateSessionStatus(this.sessionStatus, SessionStatus.TERMINATED); - this.listeners.forEach(listener -> SparkplugDataTransport.callSafely(listener::onDisconnected)); + SparkplugMqttClient.this.listeners + .forEach(listener -> SparkplugDataTransport.callSafely(listener::onDisconnected)); } catch (MqttException e) { logger.error("Error terminating Sparkplug Edge Node session", e); + return SparkplugMqttClient.this.sessionStatus; } - } else { - logInvalidStateTransition(this.sessionStatus, SessionStatus.TERMINATED); + + return new Terminated(); } - } - public synchronized void confirmSession() { - if (this.sessionStatus == SessionStatus.ESTABILISHING) { + SessionStatus toEstabilished() { sendEdgeNodeBirth(); - updateSessionStatus(SessionStatus.ESTABILISHING, SessionStatus.ESTABILISHED); - this.listeners + SparkplugMqttClient.this.listeners .forEach(listener -> SparkplugDataTransport.callSafely(listener::onConnectionEstablished, true)); - } else { - logInvalidStateTransition(this.sessionStatus, SessionStatus.ESTABILISHED); + return new Estabilished(); + } + + private void newClientConnection() throws MqttException, GeneralSecurityException, IOException { + SparkplugMqttClient.this.bdSeqCounter.next(); + setWillMessage(); + logger.debug("bdSeq: {}", SparkplugMqttClient.this.bdSeqCounter.getCurrent()); + + try { + long randomDelay = SparkplugMqttClient.this.randomDelayGenerator.nextInt(5000); + logger.info("Randomly delaying connect by {} ms", randomDelay); + Thread.sleep(randomDelay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + SparkplugMqttClient.this.client = new MqttAsyncClient(getNextServer(), SparkplugMqttClient.this.clientId, + new MemoryPersistence()); + SparkplugMqttClient.this.client.setCallback(SparkplugMqttClient.this.callback); + + IMqttToken token = SparkplugMqttClient.this.client.connect(SparkplugMqttClient.this.options); + token.waitForCompletion(SparkplugMqttClient.this.connectionTimeoutMs); + + logger.debug("Client connected"); + } + + private void disconnectClient(long quiesceTimeout) throws MqttException { + if (SparkplugMqttClient.this.client.isConnected()) { + IMqttToken token = SparkplugMqttClient.this.client.disconnect(quiesceTimeout); + token.waitForCompletion(SparkplugMqttClient.this.connectionTimeoutMs); + } + + logger.debug("Client disconnected"); + } + + private void setWillMessage() { + String topic = SparkplugTopics.getNodeDeathTopic(SparkplugMqttClient.this.groupId, + SparkplugMqttClient.this.nodeId); + byte[] payload = SparkplugPayloads.getNodeDeathPayload(SparkplugMqttClient.this.bdSeqCounter.getCurrent()); + SparkplugMqttClient.this.options.setWill(topic, payload, 1, false); + } + + private void sendEdgeNodeBirth() { + String topic = SparkplugTopics.getNodeBirthTopic(SparkplugMqttClient.this.groupId, + SparkplugMqttClient.this.nodeId); + byte[] payload = SparkplugPayloads.getNodeBirthPayload(SparkplugMqttClient.this.bdSeqCounter.getCurrent(), + 0); + publish(topic, payload, 0, false); + logger.debug("Published Edge Node BIRTH with bdSeq {}", SparkplugMqttClient.this.bdSeqCounter.getCurrent()); + } + + private void sendEdgeNodeDeath() { + String topic = SparkplugTopics.getNodeDeathTopic(SparkplugMqttClient.this.groupId, + SparkplugMqttClient.this.nodeId); + byte[] payload = SparkplugPayloads.getNodeDeathPayload(SparkplugMqttClient.this.bdSeqCounter.getCurrent()); + publish(topic, payload, 0, false); + logger.debug("Published Edge Node DEATH with bdSeq {}", SparkplugMqttClient.this.bdSeqCounter.getCurrent()); + } + + private String getNextServer() throws GeneralSecurityException, IOException { + String server; + if (SparkplugMqttClient.this.serversIterator.hasNext()) { + server = SparkplugMqttClient.this.serversIterator.next(); + } else { + SparkplugMqttClient.this.serversIterator = SparkplugMqttClient.this.servers.iterator(); + server = SparkplugMqttClient.this.serversIterator.next(); + } + + setSocketFactory(server); + + logger.info("Selecting next server {} from {}", server, SparkplugMqttClient.this.servers); + return server; + } + + private void setSocketFactory(String server) throws GeneralSecurityException, IOException { + if (server.startsWith("ssl")) { + SparkplugMqttClient.this.options + .setSocketFactory(SparkplugMqttClient.this.sslManagerService.getSSLSocketFactory()); + } else { + SparkplugMqttClient.this.options.setSocketFactory(SocketFactory.getDefault()); + } + } + + } + + private class Terminated extends SessionStatus { + + @Override + public SessionStatus estabilishSession(boolean shouldConnectClient) throws KuraConnectException { + return toEstabilishing(shouldConnectClient); + } + + @Override + public SessionStatus terminateSession(boolean shouldDisconnectClient, long quiesceTimeout) { + return this; + } + + @Override + public SessionStatus confirmSession() { + return this; + } + + } + + private class Estabilishing extends SessionStatus { + + @Override + public SessionStatus estabilishSession(boolean shouldConnectClient) throws KuraConnectException { + return this; + } + + @Override + public SessionStatus terminateSession(boolean shouldDisconnectClient, long quiesceTimeout) { + return toTerminated(shouldDisconnectClient, quiesceTimeout); + } + + @Override + public SessionStatus confirmSession() { + return toEstabilished(); + } + + } + + private class Estabilished extends SessionStatus { + + @Override + public SessionStatus estabilishSession(boolean shouldConnectClient) throws KuraConnectException { + return this; + } + + @Override + public SessionStatus terminateSession(boolean shouldDisconnectClient, long quiesceTimeout) { + return toTerminated(shouldDisconnectClient, quiesceTimeout); + } + + @Override + public SessionStatus confirmSession() { + return toEstabilished(); } + + } + + /* + * Public methods + */ + + public SparkplugMqttClient(SparkplugDataTransportOptions options, MqttCallback callback, + Set listeners, SslManagerService sslManagerService) { + this.servers = options.getServers(); + this.serversIterator = this.servers.iterator(); + this.clientId = options.getClientId(); + this.options = options.getMqttConnectOptions(); + this.callback = callback; + this.listeners = listeners; + + this.groupId = options.getGroupId(); + this.nodeId = options.getNodeId(); + this.primaryHostId = options.getPrimaryHostApplicationId(); + this.connectionTimeoutMs = options.getConnectionTimeoutMs(); + + this.sslManagerService = sslManagerService; + + logger.info( + "Sparkplug MQTT client updated" + "\n\tServers: {}" + "\n\tClient ID: {}" + "\n\tGroup ID: {}" + + "\n\tNode ID: {}" + "\n\tPrimary Host Application ID: {}" + "\n\tConnection Timeout (ms): {}", + this.servers, this.clientId, this.groupId, this.nodeId, this.primaryHostId, this.connectionTimeoutMs); + } + + public synchronized boolean isSessionEstabilished() { + return this.sessionStatus instanceof Estabilished && Objects.nonNull(this.client) + && this.client.isConnected(); + } + + public synchronized void handleConnectionLost() { + doSessionTransition(new Terminated()); + } + + public synchronized void estabilishSession(boolean shouldConnectClient) throws KuraConnectException { + logger.debug("Requested session estabilishment"); + doSessionTransition(this.sessionStatus.estabilishSession(shouldConnectClient)); + } + + public synchronized void terminateSession(boolean shouldDisconnectClient, long quiesceTimeout) { + logger.debug("Requested session termination"); + doSessionTransition(this.sessionStatus.terminateSession(shouldDisconnectClient, quiesceTimeout)); + } + + public synchronized void confirmSession() { + logger.debug("Requested session confirmation"); + doSessionTransition(this.sessionStatus.confirmSession()); } public synchronized IMqttDeliveryToken publish(String topic, byte[] payload, int qos, boolean isRetained) { @@ -205,89 +365,16 @@ public synchronized Runnable getMessageDispatcher(String topic, MqttMessage mess * Private methods */ - private String getNextServer() throws GeneralSecurityException, IOException { - String server; - if (this.serversIterator.hasNext()) { - server = this.serversIterator.next(); - } else { - this.serversIterator = this.servers.iterator(); - server = this.serversIterator.next(); - } - - setSocketFactory(server); - - logger.info("Selecting next server {} from {}", server, this.servers); - return server; - } + private void doSessionTransition(SessionStatus newStatus) { + String from = this.sessionStatus.getClass().getSimpleName(); + String to = newStatus.getClass().getSimpleName(); - private void setSocketFactory(String server) throws GeneralSecurityException, IOException { - if (server.startsWith("ssl")) { - this.options.setSocketFactory(this.sslManagerService.getSSLSocketFactory()); - } else { - this.options.setSocketFactory(SocketFactory.getDefault()); + if (!from.equals(to)) { + logger.info("Sparkplug session: {} -> {}", from, to); + this.sessionStatus = newStatus; } } - private void setWillMessage() { - String topic = SparkplugTopics.getNodeDeathTopic(this.groupId, this.nodeId); - byte[] payload = SparkplugPayloads.getNodeDeathPayload(this.bdSeqCounter.getCurrent()); - this.options.setWill(topic, payload, 1, false); - } - - private void sendEdgeNodeBirth() { - String topic = SparkplugTopics.getNodeBirthTopic(this.groupId, this.nodeId); - byte[] payload = SparkplugPayloads.getNodeBirthPayload(this.bdSeqCounter.getCurrent(), 0); - publish(topic, payload, 0, false); - logger.debug("Published Edge Node BIRTH with bdSeq {}", this.bdSeqCounter.getCurrent()); - } - - private void sendEdgeNodeDeath() { - String topic = SparkplugTopics.getNodeDeathTopic(this.groupId, this.nodeId); - byte[] payload = SparkplugPayloads.getNodeDeathPayload(this.bdSeqCounter.getCurrent()); - publish(topic, payload, 0, false); - logger.debug("Published Edge Node DEATH with bdSeq {}", this.bdSeqCounter.getCurrent()); - } - - private void newClientConnection() throws MqttException, GeneralSecurityException, IOException { - this.bdSeqCounter.next(); - setWillMessage(); - logger.debug("bdSeq: {}", this.bdSeqCounter.getCurrent()); - - try { - long randomDelay = this.randomDelayGenerator.nextInt(5000); - logger.info("Randomly delaying connect by {} ms", randomDelay); - Thread.sleep(randomDelay); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - - this.client = new MqttAsyncClient(getNextServer(), this.clientId, new MemoryPersistence()); - this.client.setCallback(this.callback); - - IMqttToken token = this.client.connect(this.options); - token.waitForCompletion(this.connectionTimeoutMs); - - logger.debug("Client connected"); - } - - private void disconnectClient(long quiesceTimeout) throws MqttException { - if (this.client.isConnected()) { - IMqttToken token = this.client.disconnect(quiesceTimeout); - token.waitForCompletion(this.connectionTimeoutMs); - } - - logger.debug("Client disconnected"); - } - - private void updateSessionStatus(SessionStatus from, SessionStatus to) { - logger.info("Sparkplug Session: {} -> {}", from, to); - this.sessionStatus = to; - } - - private void logInvalidStateTransition(SessionStatus from, SessionStatus to) { - logger.debug("Invalid state transition {} -> {}, ignoring request", from, to); - } - private synchronized void dispatchMessage(String topic, MqttMessage message) { boolean isValidStateMessage = this.primaryHostId.isPresent() && topic.equals(SparkplugTopics.getStateTopic(this.primaryHostId.get()));