diff --git a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/OSGI-INF/SparkplugCloudEndpoint.xml b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/OSGI-INF/SparkplugCloudEndpoint.xml
index d9aaeb1623f..0d686197735 100644
--- a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/OSGI-INF/SparkplugCloudEndpoint.xml
+++ b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/OSGI-INF/SparkplugCloudEndpoint.xml
@@ -42,6 +42,12 @@
cardinality="1..1"
bind="setEventAdmin"/>
+
+
diff --git a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/OSGI-INF/SparkplugDataTransport.xml b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/OSGI-INF/SparkplugDataTransport.xml
index 6bb17d543dc..4fa70b070b9 100644
--- a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/OSGI-INF/SparkplugDataTransport.xml
+++ b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/OSGI-INF/SparkplugDataTransport.xml
@@ -28,6 +28,11 @@
+
+
diff --git a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/OSGI-INF/metatype/org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransport.xml b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/OSGI-INF/metatype/org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransport.xml
index 2308f695852..44fa9b101ce 100644
--- a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/OSGI-INF/metatype/org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransport.xml
+++ b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/OSGI-INF/metatype/org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransport.xml
@@ -19,6 +19,61 @@
name="SparkplugDataTransport"
description="Data Transport layer configuration.">
+
+
+
+
+
+
+
+
+
+
+
+
+
+
properties) {
this.kuraServicePid = (String) properties.get(ConfigurationService.KURA_SERVICE_PID);
logger.info("{} - Activating", this.kuraServicePid);
diff --git a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/factory/SparkplugCloudConnectionFactory.java b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/factory/SparkplugCloudConnectionFactory.java
index 1d0232e638e..775bef9bdc8 100644
--- a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/factory/SparkplugCloudConnectionFactory.java
+++ b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/factory/SparkplugCloudConnectionFactory.java
@@ -70,6 +70,8 @@ public void createConfiguration(String pid) throws KuraException {
cloudEndpointProperties.put(DATA_SERVICE_REFERENCE_NAME,
String.format(REFERENCE_TARGET_VALUE_FORMAT, dataServicePid));
cloudEndpointProperties.put(KURA_CLOUD_CONNECTION_FACTORY_PID, FACTORY_PID);
+ cloudEndpointProperties.put(DATA_TRANSPORT_SERVICE_REFERENCE_NAME,
+ String.format(REFERENCE_TARGET_VALUE_FORMAT, dataTransportServicePid));
this.configurationService.createFactoryConfiguration(CLOUD_ENDPOINT_FACTORY_PID, pid, cloudEndpointProperties,
false);
diff --git a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/message/SparkplugPayloads.java b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/message/SparkplugPayloads.java
new file mode 100644
index 00000000000..08cb41394f2
--- /dev/null
+++ b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/message/SparkplugPayloads.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Copyright (c) 2023 Eurotech and/or its affiliates and others
+ *
+ * This program and the accompanying materials are made
+ * available under the terms of the Eclipse Public License 2.0
+ * which is available at https://www.eclipse.org/legal/epl-2.0/
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ *
+ * Contributors:
+ * Eurotech
+ *******************************************************************************/
+package org.eclipse.kura.cloudconnection.sparkplug.mqtt.message;
+
+import java.util.Date;
+
+import org.eclipse.tahu.protobuf.SparkplugBProto.DataType;
+import org.eclipse.tahu.protobuf.SparkplugBProto.Payload;
+
+public class SparkplugPayloads {
+
+ private SparkplugPayloads() {
+ }
+
+ public static byte[] getNodeDeathPayload(long bdSeq) {
+ return new SparkplugBProtobufPayloadBuilder().withBdSeq(bdSeq, new Date().getTime()).build();
+ }
+
+ public static byte[] getNodeBirthPayload(long bdSeq, long seq) {
+ long timestamp = new Date().getTime();
+
+ Payload.Builder protoMsg = Payload.newBuilder();
+
+ Payload.Metric.Builder bdSeqMetric = Payload.Metric.newBuilder();
+ bdSeqMetric.setName("bdSeq");
+ bdSeqMetric.setLongValue(bdSeq);
+ bdSeqMetric.setDatatype(DataType.Int64.getNumber());
+ bdSeqMetric.setTimestamp(timestamp);
+ protoMsg.addMetrics(bdSeqMetric.build());
+
+ Payload.Metric.Builder rebirthMetric = Payload.Metric.newBuilder();
+ rebirthMetric.setName("Node Control/Rebirth");
+ rebirthMetric.setBooleanValue(false);
+ rebirthMetric.setDatatype(DataType.Boolean_VALUE);
+ protoMsg.addMetrics(rebirthMetric.build());
+
+ protoMsg.setSeq(seq);
+ protoMsg.setTimestamp(timestamp);
+
+ return protoMsg.build().toByteArray();
+ }
+
+}
diff --git a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugDataTransport.java b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugDataTransport.java
index 43bd5e21d0a..69e9cb0461a 100644
--- a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugDataTransport.java
+++ b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugDataTransport.java
@@ -12,34 +12,107 @@
*******************************************************************************/
package org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
import org.eclipse.kura.KuraConnectException;
import org.eclipse.kura.KuraException;
import org.eclipse.kura.KuraNotConnectedException;
-import org.eclipse.kura.KuraTimeoutException;
-import org.eclipse.kura.KuraTooManyInflightMessagesException;
+import org.eclipse.kura.cloudconnection.sparkplug.mqtt.message.SparkplugPayloads;
+import org.eclipse.kura.cloudconnection.sparkplug.mqtt.message.SparkplugTopics;
import org.eclipse.kura.configuration.ConfigurableComponent;
+import org.eclipse.kura.configuration.ConfigurationService;
import org.eclipse.kura.data.DataTransportService;
import org.eclipse.kura.data.DataTransportToken;
import org.eclipse.kura.data.transport.listener.DataTransportListener;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.IMqttToken;
+import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class SparkplugDataTransport implements ConfigurableComponent, DataTransportService {
+public class SparkplugDataTransport implements ConfigurableComponent, DataTransportService, MqttCallback {
+
+ private static final Logger logger = LoggerFactory.getLogger(SparkplugDataTransport.class);
+
+ private String kuraServicePid;
+ private String sessionId;
+ private SparkplugDataTransportOptions options;
+ private MqttAsyncClient client;
+ private boolean isInitialized = false;
+ private Set dataTransportListeners = new HashSet<>();
/*
* Activation APIs
*/
+ public void setMqttAsyncClient(MqttAsyncClient client) {
+ this.client = client;
+ }
+
public void activate(Map properties) {
+ this.kuraServicePid = (String) properties.get(ConfigurationService.KURA_SERVICE_PID);
+ logger.info("{} - Activating", this.kuraServicePid);
+ update(properties);
+
+ logger.info("{} - Activated", this.kuraServicePid);
}
public void update(Map properties) {
+ logger.info("{} - Updating", this.kuraServicePid);
+
+ boolean wasConnected = isConnected();
+
+ this.dataTransportListeners.forEach(listener -> callSafely(listener::onConfigurationUpdating, wasConnected));
+
+ try {
+ this.options = new SparkplugDataTransportOptions(properties);
+
+ this.client = new MqttAsyncClient(this.options.getPrimaryServerURI(), this.options.getClientId(),
+ new MemoryPersistence());
+ this.client.setCallback(this);
+
+ this.sessionId = getBrokerUrl() + "-" + getClientId();
+
+ if (wasConnected) {
+ disconnect(0);
+ connect();
+ }
+ } catch (KuraException ke) {
+ logger.error("{} - Error in configuration properties", this.kuraServicePid, ke);
+ } catch (MqttException me) {
+ logger.error("{} - Error initializing MQTT client", this.kuraServicePid, me);
+ }
+
+ this.dataTransportListeners.forEach(listener -> callSafely(listener::onConfigurationUpdated, wasConnected));
+
+ logger.info("{} - Updated", this.kuraServicePid);
+ }
+ public void initSparkplugParameters(String groupId, String nodeId, long bdSeq) {
+ String topic = SparkplugTopics.getNodeDeathTopic(groupId, nodeId);
+ byte[] payload = SparkplugPayloads.getNodeDeathPayload(bdSeq);
+ this.options.getMqttConnectOptions().setWill(topic, payload, 1, false);
+
+ this.isInitialized = Objects.nonNull(this.client);
}
public void deactivate() {
+ logger.info("{} - Deactivating", this.kuraServicePid);
+
+ if (isConnected()) {
+ disconnect(0);
+ }
+ logger.info("{} - Deactivated", this.kuraServicePid);
}
/*
@@ -48,75 +121,211 @@ public void deactivate() {
@Override
public void connect() throws KuraConnectException {
- // TODO Auto-generated method stub
+ if (isConnected()) {
+ throw new IllegalStateException("MQTT client is already connected");
+ }
+
+ if (!this.isInitialized) {
+ throw new IllegalStateException("MQTT client has not been initialized correctly");
+ }
+
+ try {
+ IMqttToken token = this.client.connect(this.options.getMqttConnectOptions());
+ token.waitForCompletion(this.options.getConnectionTimeoutMs());
+ this.dataTransportListeners.forEach(listener -> callSafely(listener::onConnectionEstablished, true));
+
+ logger.info("{} - Connected", this.kuraServicePid);
+ } catch (MqttException e) {
+ throw new KuraConnectException(e, "Error connecting MQTT client");
+ }
}
@Override
public boolean isConnected() {
- // TODO Auto-generated method stub
+ if (this.isInitialized) {
+ return this.client.isConnected();
+ }
return false;
}
@Override
public String getBrokerUrl() {
- // TODO Auto-generated method stub
- return null;
+ if (this.isInitialized) {
+ return isConnected() ? this.client.getCurrentServerURI() : this.client.getServerURI();
+ }
+ return "";
}
@Override
public String getAccountName() {
- // TODO Auto-generated method stub
- return null;
+ return "";
}
@Override
public String getUsername() {
- // TODO Auto-generated method stub
- return null;
+ if (this.isInitialized) {
+ return this.options.getUsername();
+ }
+ return "";
}
@Override
public String getClientId() {
- // TODO Auto-generated method stub
- return null;
+ if (this.isInitialized) {
+ return this.options.getClientId();
+ }
+ return "";
}
@Override
public void disconnect(long quiesceTimeout) {
- // TODO Auto-generated method stub
+ this.dataTransportListeners.forEach(listener -> callSafely(listener::onDisconnecting));
+
+ if (isConnected()) {
+ try {
+ IMqttToken token = this.client.disconnect(quiesceTimeout);
+ token.waitForCompletion(this.options.getConnectionTimeoutMs());
+
+ logger.info("{} - Disconnected", this.kuraServicePid);
+ } catch (MqttException e) {
+ logger.error("{} - Error disconnecting", this.kuraServicePid, e);
+ }
+ } else {
+ logger.warn("{} - Already disconnected", this.kuraServicePid);
+ }
+ this.dataTransportListeners.forEach(listener -> callSafely(listener::onDisconnected));
}
@Override
- public void subscribe(String topic, int qos) throws KuraTimeoutException, KuraException, KuraNotConnectedException {
- // TODO Auto-generated method stub
+ public void subscribe(String topic, int qos) throws KuraException {
+ checkConnectedAndInitialized();
+
+ try {
+ IMqttToken token = this.client.subscribe(topic, qos);
+ token.waitForCompletion(this.options.getConnectionTimeoutMs());
+
+ logger.info("{} - Subscribed to topic {} with QoS {}", this.kuraServicePid, topic, qos);
+ } catch (MqttException e) {
+ logger.error("{} - Error subscribing", this.kuraServicePid, e);
+ }
}
@Override
- public void unsubscribe(String topic) throws KuraTimeoutException, KuraException, KuraNotConnectedException {
- // TODO Auto-generated method stub
+ public void unsubscribe(String topic) throws KuraException {
+ checkConnectedAndInitialized();
+
+ try {
+ IMqttToken token = this.client.unsubscribe(topic);
+ token.waitForCompletion(this.options.getConnectionTimeoutMs());
+ logger.info("{} - Unsubscribed from topic {}", this.kuraServicePid, topic);
+ } catch (MqttException e) {
+ logger.error("{} - Error unsubscribing", this.kuraServicePid, e);
+ }
}
@Override
- public DataTransportToken publish(String topic, byte[] payload, int qos, boolean retain)
- throws KuraTooManyInflightMessagesException, KuraException, KuraNotConnectedException {
- // TODO Auto-generated method stub
+ public DataTransportToken publish(String topic, byte[] payload, int qos, boolean retain) throws KuraException {
+ checkConnectedAndInitialized();
+
+ logger.info("{} - Publishing message on topic {} with QoS {} and retain {}", this.kuraServicePid, topic, qos,
+ retain);
+
+ try {
+ IMqttDeliveryToken deliveryToken = this.client.publish(topic, payload, qos, retain);
+
+ if (qos > 0) {
+ return new DataTransportToken(deliveryToken.getMessageId(), this.sessionId);
+ }
+ } catch (MqttException pe) {
+ logger.error("{} - Error publishing", this.kuraServicePid, pe);
+ }
+
return null;
}
@Override
public void addDataTransportListener(DataTransportListener listener) {
- // TODO Auto-generated method stub
-
+ logger.debug("{} - Adding DataTransportListener {}", this.kuraServicePid, listener.getClass().getName());
+ this.dataTransportListeners.add(listener);
}
@Override
public void removeDataTransportListener(DataTransportListener listener) {
- // TODO Auto-generated method stub
+ logger.debug("{} - Removing DataTransportListener {}", this.kuraServicePid, listener.getClass().getName());
+ this.dataTransportListeners.remove(listener);
+ }
+
+ /*
+ * MqttCallback APIs
+ */
+
+ @Override
+ public void connectionLost(Throwable arg0) {
+ logger.info("{} - Connection lost", this.kuraServicePid);
+ this.dataTransportListeners.forEach(listener -> callSafely(listener::onConnectionLost, arg0));
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken deliveryToken) {
+ try {
+ if (deliveryToken.getMessage().getQos() > 0) {
+ DataTransportToken dataTransportToken = new DataTransportToken(deliveryToken.getMessageId(),
+ this.sessionId);
+
+ this.dataTransportListeners
+ .forEach(listener -> callSafely(listener::onMessageConfirmed, dataTransportToken));
+ }
+ } catch (MqttException e) {
+ logger.error("{} - Error processing MQTTDeliveryToken", this.kuraServicePid, e);
+ }
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ logger.debug("{} - Message arrived on topic {} with QoS {}", this.kuraServicePid, topic, message.getQos());
+
+ this.dataTransportListeners.forEach(listener -> {
+ try {
+ listener.onMessageArrived(topic, message.getPayload(), message.getQos(), message.isRetained());
+ } catch (Exception e) {
+ logger.error("{} - Error processing onMessageArrived for listener {}", this.kuraServicePid,
+ listener.getClass().getName(), e);
+ }
+ });
+ }
+
+ /*
+ * Utils
+ */
+
+ private void checkConnectedAndInitialized() throws KuraNotConnectedException {
+ if (!this.isInitialized) {
+ throw new IllegalStateException("MQTT client has not been initialized correctly");
+ }
+
+ if (!isConnected()) {
+ throw new KuraNotConnectedException("MQTT client is not connected");
+ }
+ }
+
+ private void callSafely(Runnable f) {
+ try {
+ f.run();
+ } catch (Exception e) {
+ logger.error("{} - An error occured in listener {}", this.kuraServicePid, f.getClass().getName(), e);
+ }
+ }
+ private void callSafely(Consumer f, T argument) {
+ try {
+ f.accept(argument);
+ } catch (Exception e) {
+ logger.error("{} - An error occured in listener {}", this.kuraServicePid, f.getClass().getName(), e);
+ }
}
}
diff --git a/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugDataTransportOptions.java b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugDataTransportOptions.java
new file mode 100644
index 00000000000..e9c16d0c834
--- /dev/null
+++ b/kura/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider/src/main/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/SparkplugDataTransportOptions.java
@@ -0,0 +1,124 @@
+/*******************************************************************************
+ * Copyright (c) 2023 Eurotech and/or its affiliates and others
+ *
+ * This program and the accompanying materials are made
+ * available under the terms of the Eclipse Public License 2.0
+ * which is available at https://www.eclipse.org/legal/epl-2.0/
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ *
+ * Contributors:
+ * Eurotech
+ *******************************************************************************/
+package org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import org.eclipse.kura.KuraErrorCode;
+import org.eclipse.kura.KuraException;
+import org.eclipse.kura.configuration.Password;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+
+public class SparkplugDataTransportOptions {
+
+ public static final String KEY_SERVER_URIS = "server.uris";
+ public static final String KEY_CLIENT_ID = "client.id";
+ public static final String KEY_USERNAME = "username";
+ public static final String KEY_PASSWORD = "password";
+ public static final String KEY_KEEP_ALIVE = "keep.alive";
+ public static final String KEY_CONNECTION_TIMEOUT = "connection.timeout";
+
+ private MqttConnectOptions connectionOptions = new MqttConnectOptions();
+ private String clientId;
+
+ public SparkplugDataTransportOptions(Map properties) throws KuraException {
+ String servers = getString(KEY_SERVER_URIS, properties).orElseThrow(getKuraException(KEY_SERVER_URIS));
+ String clientIdentifier = getString(KEY_CLIENT_ID, properties).orElseThrow(getKuraException(KEY_CLIENT_ID));
+
+ if (servers.isEmpty()) {
+ throw getKuraException(KEY_SERVER_URIS).get();
+ }
+
+ if (clientIdentifier.isEmpty()) {
+ throw getKuraException(KEY_CLIENT_ID).get();
+ }
+
+ Optional username = getString(KEY_USERNAME, properties);
+ Optional password = getPassword(KEY_PASSWORD, properties);
+ Optional connectionTimeout = getInteger(KEY_CONNECTION_TIMEOUT, properties);
+ Optional keepAlive = getInteger(KEY_KEEP_ALIVE, properties);
+
+ this.connectionOptions.setServerURIs(getServerURIs(servers));
+ this.clientId = clientIdentifier;
+
+ if (username.isPresent()) {
+ this.connectionOptions.setUserName(username.get());
+ }
+
+ if (password.isPresent()) {
+ this.connectionOptions.setPassword(password.get().getPassword());
+ }
+
+ if (connectionTimeout.isPresent()) {
+ this.connectionOptions.setConnectionTimeout(connectionTimeout.get());
+ }
+
+ if (keepAlive.isPresent()) {
+ this.connectionOptions.setKeepAliveInterval(keepAlive.get());
+ }
+
+ this.connectionOptions.setCleanSession(true);
+ this.connectionOptions.setAutomaticReconnect(false);
+ }
+
+ public MqttConnectOptions getMqttConnectOptions() {
+ return this.connectionOptions;
+ }
+
+ public String getClientId() {
+ return this.clientId;
+ }
+
+ public String getUsername() {
+ return this.connectionOptions.getUserName();
+ }
+
+ public long getConnectionTimeoutMs() {
+ return this.connectionOptions.getConnectionTimeout() * 1000L;
+ }
+
+ public String getPrimaryServerURI() {
+ return this.connectionOptions.getServerURIs()[0];
+ }
+
+ private Optional getString(String key, Map map) {
+ return Optional.ofNullable((String) map.get(key));
+ }
+
+ private Optional getPassword(String key, Map map) {
+ return Optional.ofNullable((Password) map.get(key));
+ }
+
+ private Optional getInteger(String key, Map map) {
+ return Optional.ofNullable((Integer) map.get(key));
+ }
+
+ private Supplier getKuraException(String propertyName) {
+ return () -> new KuraException(KuraErrorCode.INVALID_PARAMETER,
+ "The property " + propertyName + " is null or empty");
+ }
+
+ private String[] getServerURIs(String spaceSeparatedservers) throws KuraException {
+ String[] servers = spaceSeparatedservers.split(" ");
+ for (String server : servers) {
+ if (server.endsWith("/") || server.isEmpty()) {
+ throw new KuraException(KuraErrorCode.INVALID_PARAMETER,
+ "Server URI cannot be empty, or end with '/', or contain a path");
+ }
+ }
+ return servers;
+ }
+
+}
diff --git a/kura/test/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider.test/src/test/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/factory/test/SparkplugCloudConnectionFactoryTest.java b/kura/test/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider.test/src/test/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/factory/test/SparkplugCloudConnectionFactoryTest.java
index 09d155ab22d..15e124fdb7f 100644
--- a/kura/test/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider.test/src/test/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/factory/test/SparkplugCloudConnectionFactoryTest.java
+++ b/kura/test/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider.test/src/test/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/factory/test/SparkplugCloudConnectionFactoryTest.java
@@ -67,6 +67,8 @@ public void shouldCreateCloudStackComponentsWithSuffix() throws KuraException {
"(kura.service.pid=org.eclipse.kura.cloudconnection.sparkplug.mqtt.data.SparkplugDataService-test)")
.withProperty(CloudConnectionFactory.KURA_CLOUD_CONNECTION_FACTORY_PID,
"org.eclipse.kura.cloudconnection.sparkplug.mqtt.factory.SparkplugCloudConnectionFactory")
+ .withProperty("DataTransportService.target",
+ "(kura.service.pid=org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransport-test)")
.build(),
false);
thenFactoryComponentIsCreated(
@@ -96,6 +98,8 @@ public void shouldCreateCloudStackComponentsWithoutSuffix() throws KuraException
"(kura.service.pid=org.eclipse.kura.cloudconnection.sparkplug.mqtt.data.SparkplugDataService)")
.withProperty(CloudConnectionFactory.KURA_CLOUD_CONNECTION_FACTORY_PID,
"org.eclipse.kura.cloudconnection.sparkplug.mqtt.factory.SparkplugCloudConnectionFactory")
+ .withProperty("DataTransportService.target",
+ "(kura.service.pid=org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransport)")
.build(),
false);
thenFactoryComponentIsCreated(
diff --git a/kura/test/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider.test/src/test/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/test/SparkplugDataTransportOptionsTest.java b/kura/test/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider.test/src/test/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/test/SparkplugDataTransportOptionsTest.java
new file mode 100644
index 00000000000..6dcdb0c6c1b
--- /dev/null
+++ b/kura/test/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider.test/src/test/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/test/SparkplugDataTransportOptionsTest.java
@@ -0,0 +1,278 @@
+/*******************************************************************************
+ * Copyright (c) 2023 Eurotech and/or its affiliates and others
+ *
+ * This program and the accompanying materials are made
+ * available under the terms of the Eclipse Public License 2.0
+ * which is available at https://www.eclipse.org/legal/epl-2.0/
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ *
+ * Contributors:
+ * Eurotech
+ *******************************************************************************/
+package org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.eclipse.kura.KuraException;
+import org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransportOptions;
+import org.eclipse.kura.configuration.Password;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Enclosed.class)
+public class SparkplugDataTransportOptionsTest {
+
+ /*
+ * Scenarios
+ */
+
+ @RunWith(Parameterized.class)
+ public static class ServerUrisParameterFailTest extends Steps {
+
+ @Parameters
+ public static Collection uris() {
+ List data = new LinkedList<>();
+ data.add(null);
+ data.add("");
+ data.add("tcp://broker:1883/");
+ data.add("tcp://broker:1883 tcp://broker:1883/");
+ data.add("tcp://broker:1883 tcp://broker2:1883");
+ return data;
+ }
+
+ private String serverUris;
+
+ public ServerUrisParameterFailTest(String param) {
+ this.serverUris = param;
+ }
+
+ @Test
+ public void shouldThrowKuraExceptionOnWrongServerUri() {
+ givenProperty(SparkplugDataTransportOptions.KEY_SERVER_URIS, this.serverUris);
+ givenProperty(SparkplugDataTransportOptions.KEY_CLIENT_ID, "test");
+
+ whenOptionsCreated();
+
+ thenExceptionOccurred(KuraException.class);
+ }
+ }
+
+ @RunWith(Parameterized.class)
+ public static class ClientIdParameterFailTest extends Steps {
+
+ @Parameters
+ public static Collection uris() {
+ List data = new LinkedList<>();
+ data.add(null);
+ data.add("");
+ return data;
+ }
+
+ private String clientId;
+
+ public ClientIdParameterFailTest(String param) {
+ this.clientId = param;
+ }
+
+ @Test
+ public void shouldThrowKuraExceptionOnWrongClientId() {
+ givenProperty(SparkplugDataTransportOptions.KEY_SERVER_URIS, "tcp://broker:1883");
+ givenProperty(SparkplugDataTransportOptions.KEY_CLIENT_ID, this.clientId);
+
+ whenOptionsCreated();
+
+ thenExceptionOccurred(KuraException.class);
+ }
+ }
+
+ @RunWith(Parameterized.class)
+ public static class ServerUrisParameterTest extends Steps {
+
+ @Parameters
+ public static Collection uris() {
+ List data = new LinkedList<>();
+ data.add("tcp://broker:1883");
+ data.add("tcp://broker1:1883 tcp://broker2:1883");
+ return data;
+ }
+
+ private String serverUris;
+
+ public ServerUrisParameterTest(String param) {
+ this.serverUris = param;
+ }
+
+ @Test
+ public void shouldReturnCorrectPrimaryServerId() throws KuraException {
+ givenProperty(SparkplugDataTransportOptions.KEY_SERVER_URIS, this.serverUris);
+ givenProperty(SparkplugDataTransportOptions.KEY_CLIENT_ID, "test");
+ givenOptionsCreated();
+
+ whenGetPrimaryServerURI();
+
+ thenReturnedStringEquals(this.serverUris.split(" ")[0]);
+ }
+ }
+
+ public static class GettersTest extends Steps {
+
+ @Test
+ public void shouldReturnCorrectClientId() throws KuraException {
+ givenProperty(SparkplugDataTransportOptions.KEY_SERVER_URIS, "tcp://broker:1883");
+ givenProperty(SparkplugDataTransportOptions.KEY_CLIENT_ID, "test");
+ givenOptionsCreated();
+
+ whenGetClientId();
+
+ thenReturnedStringEquals("test");
+ }
+
+ @Test
+ public void shouldReturnCorrectUsername() throws KuraException {
+ givenProperty(SparkplugDataTransportOptions.KEY_SERVER_URIS, "tcp://broker:1883");
+ givenProperty(SparkplugDataTransportOptions.KEY_CLIENT_ID, "test");
+ givenProperty(SparkplugDataTransportOptions.KEY_USERNAME, "user");
+ givenOptionsCreated();
+
+ whenGetUsername();
+
+ thenReturnedStringEquals("user");
+ }
+
+ @Test
+ public void shouldReturnCorrectConnectionTimeoutMs() throws KuraException {
+ givenProperty(SparkplugDataTransportOptions.KEY_SERVER_URIS, "tcp://broker:1883");
+ givenProperty(SparkplugDataTransportOptions.KEY_CLIENT_ID, "test");
+ givenProperty(SparkplugDataTransportOptions.KEY_CONNECTION_TIMEOUT, 2);
+ givenOptionsCreated();
+
+ whenGetConnectionTimeoutMs();
+
+ thenReturnedLongEquals(2000);
+ }
+ }
+
+ public static class MqttConnectOptionsTest extends Steps {
+
+ @Test
+ public void shouldReturnCorrectPassword() throws KuraException {
+ givenProperty(SparkplugDataTransportOptions.KEY_SERVER_URIS, "tcp://broker:1883");
+ givenProperty(SparkplugDataTransportOptions.KEY_CLIENT_ID, "test");
+ givenProperty(SparkplugDataTransportOptions.KEY_PASSWORD, new Password("secret"));
+ givenOptionsCreated();
+
+ whenGetMqttConnectOptions();
+
+ thenMqttConnectOptionsContainsPassword("secret");
+ }
+
+ @Test
+ public void shouldReturnCorrectKeepAlive() throws KuraException {
+ givenProperty(SparkplugDataTransportOptions.KEY_SERVER_URIS, "tcp://broker:1883");
+ givenProperty(SparkplugDataTransportOptions.KEY_CLIENT_ID, "test");
+ givenProperty(SparkplugDataTransportOptions.KEY_KEEP_ALIVE, 12);
+ givenOptionsCreated();
+
+ whenGetMqttConnectOptions();
+
+ thenMqttConnectOptionsContainsKeepAlive(12);
+ }
+
+ }
+
+ /*
+ * Steps
+ */
+
+ public abstract static class Steps {
+
+ private SparkplugDataTransportOptions options;
+ private Map properties = new HashMap<>();
+ private Exception occurredException;
+ private String returnedString;
+ private long returnedLong;
+ private MqttConnectOptions mqttConnectOptions;
+
+ /*
+ * Given
+ */
+
+ void givenProperty(String key, Object value) {
+ this.properties.put(key, value);
+ }
+
+ void givenOptionsCreated() throws KuraException {
+ this.options = new SparkplugDataTransportOptions(this.properties);
+ }
+
+ /*
+ * When
+ */
+
+ void whenOptionsCreated() {
+ try {
+ givenOptionsCreated();
+ } catch (Exception e) {
+ this.occurredException = e;
+ }
+ }
+
+ void whenGetPrimaryServerURI() {
+ this.returnedString = this.options.getPrimaryServerURI();
+ }
+
+ void whenGetClientId() {
+ this.returnedString = this.options.getClientId();
+ }
+
+ void whenGetUsername() {
+ this.returnedString = this.options.getUsername();
+ }
+
+ void whenGetConnectionTimeoutMs() {
+ this.returnedLong = this.options.getConnectionTimeoutMs();
+ }
+
+ void whenGetMqttConnectOptions() {
+ this.mqttConnectOptions = this.options.getMqttConnectOptions();
+ }
+
+ /*
+ * Then
+ */
+
+ void thenExceptionOccurred(Class expectedException) {
+ assertNotNull(this.occurredException);
+ assertEquals(expectedException.getName(), this.occurredException.getClass().getName());
+ }
+
+ void thenReturnedStringEquals(String expectedResult) {
+ assertEquals(expectedResult, this.returnedString);
+ }
+
+ void thenReturnedLongEquals(long expectedLong) {
+ assertEquals(expectedLong, this.returnedLong);
+ }
+
+ void thenMqttConnectOptionsContainsPassword(String expectedPassword) {
+ assertEquals(expectedPassword, new String(this.mqttConnectOptions.getPassword()));
+ }
+
+ void thenMqttConnectOptionsContainsKeepAlive(int expectedKeepAlive) {
+ assertEquals(expectedKeepAlive, this.mqttConnectOptions.getKeepAliveInterval());
+ }
+ }
+
+}
diff --git a/kura/test/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider.test/src/test/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/test/SparkplugDataTransportTest.java b/kura/test/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider.test/src/test/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/test/SparkplugDataTransportTest.java
new file mode 100644
index 00000000000..8eb2acef8c2
--- /dev/null
+++ b/kura/test/org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider.test/src/test/java/org/eclipse/kura/cloudconnection/sparkplug/mqtt/transport/test/SparkplugDataTransportTest.java
@@ -0,0 +1,698 @@
+/*******************************************************************************
+ * Copyright (c) 2023 Eurotech and/or its affiliates and others
+ *
+ * This program and the accompanying materials are made
+ * available under the terms of the Eclipse Public License 2.0
+ * which is available at https://www.eclipse.org/legal/epl-2.0/
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ *
+ * Contributors:
+ * Eurotech
+ *******************************************************************************/
+package org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.eclipse.kura.KuraConnectException;
+import org.eclipse.kura.KuraNotConnectedException;
+import org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransport;
+import org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransportOptions;
+import org.eclipse.kura.configuration.Password;
+import org.eclipse.kura.data.DataTransportToken;
+import org.eclipse.kura.data.transport.listener.DataTransportListener;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.IMqttToken;
+import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SparkplugDataTransportTest {
+
+ private MqttAsyncClient client;
+ private SparkplugDataTransport transport = new SparkplugDataTransport();
+ private DataTransportListener listener = mock(DataTransportListener.class);
+ private Exception occurredException;
+ private String returnedString;
+ private DataTransportToken deliveryToken;
+
+ /*
+ * Scenarios
+ */
+
+ // update
+
+ @Test
+ public void shouldNotifyListenersOnUpdate() throws Exception {
+ givenDataTransportDataListener(this.listener);
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(false).build());
+
+ whenUpdate(getDefaultProperties("test-client", "user"));
+
+ thenListenerNotifiedOnConfigurationUpdating(1, false);
+ thenListenerNotifiedOnConfigurationUpdated(1, false);
+ }
+
+ @Test
+ public void shouldNotifyListenersOnUpdateWasConnected() throws Exception {
+ givenDataTransportDataListener(this.listener);
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(true).build());
+ givenInitSparkplugParameters("g1", "h1", 1L);
+
+ whenUpdate(getDefaultProperties("test-client", "user"));
+
+ thenListenerNotifiedOnConfigurationUpdating(1, true);
+ thenListenerNotifiedOnConfigurationUpdated(1, true);
+ }
+
+ @Test
+ public void shouldNotifyListenersOnFailingUpdate() throws Exception {
+ givenDataTransportDataListener(this.listener);
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(false).build());
+
+ whenUpdate(getDefaultProperties("test-client", "user"));
+
+ thenListenerNotifiedOnConfigurationUpdating(1, false);
+ thenListenerNotifiedOnConfigurationUpdated(1, false);
+ }
+
+ // disconnect
+
+ @Test
+ public void shouldDisconnectIfConnected() throws Exception {
+ givenDataTransportDataListener(this.listener);
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(true).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenDisconnect(100);
+
+ thenClientDisconnects(1, 100);
+ thenListenerNotifiedOnDisconnecting(1);
+ thenListenerNotifiedOnDisconnected(1);
+ }
+
+ @Test
+ public void shouldNotDisconnectIfNotConnected() throws Exception {
+ givenDataTransportDataListener(this.listener);
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(false).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenDisconnect(100);
+
+ thenClientDisconnects(0, 100);
+ thenListenerNotifiedOnDisconnecting(1);
+ thenListenerNotifiedOnDisconnected(1);
+ }
+
+ @Test
+ public void shouldNotThrowExceptionsOnDisconnectFailure() throws Exception {
+ givenDataTransportDataListener(this.listener);
+ givenMqttClient(new MockClientBuilder().asFailingWithTimeout().isConnected(true).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenDisconnect(100);
+
+ thenListenerNotifiedOnDisconnecting(1);
+ thenListenerNotifiedOnDisconnected(1);
+ }
+
+ // connect
+
+ @Test
+ public void shouldThrowIllegalStateExceptionWhenConnectOnAlreadyConnected() throws Exception {
+ givenDataTransportDataListener(this.listener);
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(true).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenConnect();
+
+ thenExceptionOccurred(IllegalStateException.class);
+ }
+
+ @Test
+ public void shouldThrowIllegalStateExceptionWhenNotInitialized() throws Exception {
+ givenDataTransportDataListener(this.listener);
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(true).build());
+
+ whenConnect();
+
+ thenExceptionOccurred(IllegalStateException.class);
+ thenListenerNotifiedConnected(0, true);
+ thenClientConnects(0);
+ }
+
+ @Test
+ public void shouldConnect() throws Exception {
+ givenDataTransportDataListener(this.listener);
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(false).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenConnect();
+
+ thenListenerNotifiedConnected(1, true);
+ thenClientConnects(1);
+ }
+
+ @Test
+ public void shouldThrowKuraConnectExceptionOnConnectFailure() throws Exception {
+ givenDataTransportDataListener(this.listener);
+ givenMqttClient(new MockClientBuilder().asFailingWithTimeout().isConnected(false).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenConnect();
+
+ thenListenerNotifiedConnected(0, true);
+ thenExceptionOccurred(KuraConnectException.class);
+ }
+
+ // getters
+
+ @Test
+ public void shouldReturnCurrentBrokerUrl() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(true).withCurrentServerURI("current")
+ .withServerURI("broker").build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenGetBrokerUrl();
+
+ thenReturnedStringEquals("current");
+ }
+
+ @Test
+ public void shouldReturnBrokerUrl() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(false).withCurrentServerURI("current")
+ .withServerURI("broker").build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenGetBrokerUrl();
+
+ thenReturnedStringEquals("broker");
+ }
+
+ @Test
+ public void shouldReturnEmptyBrokerUrlIfNotInitialized() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(false).withCurrentServerURI("current")
+ .withServerURI("broker").build());
+
+ whenGetBrokerUrl();
+
+ thenReturnedStringEquals("");
+ }
+
+ @Test
+ public void shouldReturnEmptyAccountName() throws Exception {
+ whenGetAccountName();
+
+ thenReturnedStringEquals("");
+ }
+
+ @Test
+ public void shouldReturnCorrectUsername() throws Exception {
+ givenActivated(getDefaultProperties("test-client", "user"));
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenGetUsername();
+
+ thenReturnedStringEquals("user");
+ }
+
+ @Test
+ public void shouldReturnEmptyUsernameIfNotInitialized() throws Exception {
+ givenActivated(getDefaultProperties("test-client", "user"));
+
+ whenGetUsername();
+
+ thenReturnedStringEquals("");
+ }
+
+ @Test
+ public void shouldReturnCorrectClientId() throws Exception {
+ givenActivated(getDefaultProperties("test-client", "user"));
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenGetClientId();
+
+ thenReturnedStringEquals("test-client");
+ }
+
+ @Test
+ public void shouldReturnEmptyClientIdIfNotInitialized() throws Exception {
+ givenActivated(getDefaultProperties("test-client", "user"));
+
+ whenGetClientId();
+
+ thenReturnedStringEquals("");
+ }
+
+ // subscribe
+
+ @Test
+ public void shouldThrowKuraNotConnectedExceptionOnSubscribeIfNotConnected() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(false).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenSubscribe("topic1", 0);
+
+ thenExceptionOccurred(KuraNotConnectedException.class);
+ }
+
+ @Test
+ public void shouldThrowIllegalStateExceptionOnSubscribeIfNotInitialized() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(true).build());
+
+ whenSubscribe("topic1", 0);
+
+ thenExceptionOccurred(IllegalStateException.class);
+ }
+
+ @Test
+ public void shouldNotFailSubscribeIfClientFails() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFailingWithTimeout().isConnected(true).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenSubscribe("topic1", 0);
+
+ thenClientSubscribed(1, "topic1", 0);
+ }
+
+ @Test
+ public void shouldSubscribeCorrectly() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(true).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenSubscribe("topic1", 0);
+
+ thenClientSubscribed(1, "topic1", 0);
+ }
+
+ // unsubscribe
+
+ @Test
+ public void shouldThrowKuraNotConnectedExceptionOnUnsubscribeIfNotConnected() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(false).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenUnsubscribe("topic1");
+
+ thenExceptionOccurred(KuraNotConnectedException.class);
+ }
+
+ @Test
+ public void shouldThrowIllegalStateExceptionOnUnsubscribeIfNotInitialized() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(true).build());
+
+ whenUnsubscribe("topic1");
+
+ thenExceptionOccurred(IllegalStateException.class);
+ }
+
+ @Test
+ public void shouldNotFailUnsubscribeIfClientFails() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFailingWithTimeout().isConnected(true).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenUnsubscribe("topic1");
+
+ thenClientUnsubscribed(1, "topic1");
+ }
+
+ @Test
+ public void shouldUnsubscribeCorrectly() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(true).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenUnsubscribe("topic1");
+
+ thenClientUnsubscribed(1, "topic1");
+ }
+
+ // publish
+
+ @Test
+ public void shouldThrowKuraNotConnectedExceptionOnPublishIfNotConnected() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(false).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenPublish("topic", "data".getBytes(), 0, false);
+
+ thenExceptionOccurred(KuraNotConnectedException.class);
+ }
+
+ @Test
+ public void shouldThrowIllegalStateExceptionOnPublishIfNotInitialized() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFunctional().isConnected(true).build());
+
+ whenPublish("topic", "data".getBytes(), 0, false);
+
+ thenExceptionOccurred(IllegalStateException.class);
+ }
+
+ @Test
+ public void shouldNotFailPublishIfClientFails() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFailingWithTimeout().isConnected(true).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenPublish("topic", "data".getBytes(), 0, false);
+
+ thenClientPublished(1, "topic", "data".getBytes(), 0, false);
+ thenRetunedNullDataTransportToken();
+ }
+
+ @Test
+ public void shouldReturnNullTokenWhenPublishWithQos0() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFailingWithTimeout().isConnected(true).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenPublish("topic", "data".getBytes(), 0, false);
+
+ thenClientPublished(1, "topic", "data".getBytes(), 0, false);
+ thenRetunedNullDataTransportToken();
+ }
+
+ @Test
+ public void shouldReturnNonNullTokenWhenPublishWithQos1() throws Exception {
+ givenMqttClient(new MockClientBuilder().asFailingWithTimeout().isConnected(true).build());
+ givenInitSparkplugParameters("g1", "h1", 0L);
+
+ whenPublish("topic", "data".getBytes(), 1, false);
+
+ thenClientPublished(1, "topic", "data".getBytes(), 1, false);
+ thenReturnedValidDataTransportToken();
+ }
+
+ // MqttCallback
+
+ @Test
+ public void shouldNotifyListenersOnConnectionLost() {
+ givenDataTransportDataListener(this.listener);
+
+ whenOnConnectionLost();
+
+ thenListenersNotifiedOnConnectionLost(1);
+ }
+
+ @Test
+ public void shouldNotifyListenersOnMessageConfirmedWithQos1() throws Exception {
+ givenDataTransportDataListener(this.listener);
+
+ whenDeliveryComplete(1);
+
+ thenListenersNotifiedOnMessageConfirmed(1);
+ }
+
+ @Test
+ public void shouldNotNotifyListenersOnMessageConfirmedWithQos0() throws Exception {
+ givenDataTransportDataListener(this.listener);
+
+ whenDeliveryComplete(0);
+
+ thenListenersNotifiedOnMessageConfirmed(0);
+ }
+
+ @Test
+ public void shouldNotifyListenersOnMessageArrived() throws Exception {
+ givenDataTransportDataListener(this.listener);
+
+ whenMessageArrived("test", "data".getBytes(), 0, false);
+
+ thenListenersNotifiedOnMessageArrived(1, "test", "data".getBytes(), 0, false);
+ }
+
+ /*
+ * Steps
+ */
+
+ /*
+ * Given
+ */
+
+ private void givenMqttClient(MqttAsyncClient client) {
+ this.client = client;
+ this.transport.setMqttAsyncClient(client);
+ }
+
+ private void givenActivated(Map properties) {
+ this.transport.activate(properties);
+ }
+
+ private void givenInitSparkplugParameters(String groupId, String hostId, long bdSeq) {
+ this.transport.initSparkplugParameters(groupId, hostId, bdSeq);
+ }
+
+ private void givenDataTransportDataListener(DataTransportListener listener) {
+ this.transport.addDataTransportListener(listener);
+ }
+
+ /*
+ * When
+ */
+
+ private void whenUpdate(Map properties) {
+ this.transport.update(properties);
+ }
+
+ private void whenDisconnect(long quiesceTimeout) {
+ this.transport.disconnect(quiesceTimeout);
+ }
+
+ private void whenConnect() {
+ try {
+ this.transport.connect();
+ } catch (Exception e) {
+ this.occurredException = e;
+ }
+ }
+
+ private void whenGetBrokerUrl() {
+ this.returnedString = this.transport.getBrokerUrl();
+ }
+
+ private void whenGetAccountName() {
+ this.returnedString = this.transport.getAccountName();
+ }
+
+ private void whenGetClientId() {
+ this.returnedString = this.transport.getClientId();
+ }
+
+ private void whenGetUsername() {
+ this.returnedString = this.transport.getUsername();
+ }
+
+ private void whenSubscribe(String topic, int qos) {
+ try {
+ this.transport.subscribe(topic, qos);
+ } catch (Exception e) {
+ this.occurredException = e;
+ }
+ }
+
+ private void whenUnsubscribe(String topic) {
+ try {
+ this.transport.unsubscribe(topic);
+ } catch (Exception e) {
+ this.occurredException = e;
+ }
+ }
+
+ private void whenPublish(String topic, byte[] payload, int qos, boolean retain) {
+ try {
+ this.deliveryToken = this.transport.publish(topic, payload, qos, retain);
+ } catch (Exception e) {
+ this.occurredException = e;
+ }
+ }
+
+ private void whenOnConnectionLost() {
+ this.transport.connectionLost(new Throwable());
+ }
+
+ private void whenDeliveryComplete(int qos) throws MqttException {
+ MqttMessage message = mock(MqttMessage.class);
+ when(message.getQos()).thenReturn(qos);
+
+ IMqttDeliveryToken token = mock(IMqttDeliveryToken.class);
+ when(token.getMessage()).thenReturn(message);
+
+ this.transport.deliveryComplete(token);
+ }
+
+ private void whenMessageArrived(String topic, byte[] payload, int qos, boolean retain) throws Exception {
+ MqttMessage message = mock(MqttMessage.class);
+ when(message.getPayload()).thenReturn(payload);
+ when(message.getQos()).thenReturn(qos);
+ when(message.isRetained()).thenReturn(retain);
+
+ this.transport.messageArrived(topic, message);
+ }
+
+ /*
+ * Then
+ */
+
+ private void thenListenerNotifiedOnConfigurationUpdating(int expectedTimes, boolean expectedWasConnected) {
+ verify(this.listener, times(expectedTimes)).onConfigurationUpdating(expectedWasConnected);
+ }
+
+ private void thenListenerNotifiedOnConfigurationUpdated(int expectedTimes, boolean expectedWasConnected) {
+ verify(this.listener, times(expectedTimes)).onConfigurationUpdated(expectedWasConnected);
+ }
+
+ private void thenListenerNotifiedOnDisconnecting(int expectedTimes) {
+ verify(this.listener, times(expectedTimes)).onDisconnecting();
+ }
+
+ private void thenListenerNotifiedOnDisconnected(int expectedTimes) {
+ verify(this.listener, times(expectedTimes)).onDisconnected();
+ }
+
+ private void thenListenerNotifiedConnected(int expectedTimes, boolean expectedNewSession) {
+ verify(this.listener, times(expectedTimes)).onConnectionEstablished(expectedNewSession);
+ }
+
+ private void thenListenersNotifiedOnConnectionLost(int expectedTimes) {
+ verify(this.listener, times(expectedTimes)).onConnectionLost(any(Throwable.class));
+ }
+
+ private void thenListenersNotifiedOnMessageConfirmed(int expectedTimes) {
+ verify(this.listener, times(expectedTimes)).onMessageConfirmed(any(DataTransportToken.class));
+ }
+
+ private void thenListenersNotifiedOnMessageArrived(int expectedTimes, String expectedTopic, byte[] expectedPayload,
+ int expectedQos, boolean expectedRetain) {
+ verify(this.listener, times(expectedTimes)).onMessageArrived(expectedTopic, expectedPayload, expectedQos,
+ expectedRetain);
+ }
+
+ private void thenClientDisconnects(int expectedTimes, long expectedQuiesceTimeout) throws MqttException {
+ verify(this.client, times(expectedTimes)).disconnect(expectedQuiesceTimeout);
+ }
+
+ private void thenClientConnects(int expectedTimes) throws MqttException {
+ verify(this.client, times(expectedTimes)).connect(any(MqttConnectOptions.class));
+ }
+
+ private void thenClientSubscribed(int expectedTimes, String expectedTopic, int expectedQos) throws MqttException {
+ verify(this.client, times(expectedTimes)).subscribe(expectedTopic, expectedQos);
+ }
+
+ private void thenClientUnsubscribed(int expectedTimes, String expectedTopic) throws MqttException {
+ verify(this.client, times(expectedTimes)).unsubscribe(expectedTopic);
+ }
+
+ private void thenClientPublished(int expectedTimes, String expectedTopic, byte[] expectedPayload, int expectedQos,
+ boolean expectedRetain) throws MqttException {
+ verify(this.client, times(expectedTimes)).publish(expectedTopic, expectedPayload, expectedQos, expectedRetain);
+ }
+
+ private void thenRetunedNullDataTransportToken() {
+ assertNull(this.deliveryToken);
+ }
+
+ private void thenReturnedValidDataTransportToken() {
+ assertNull(this.deliveryToken);
+ }
+
+ private void thenExceptionOccurred(Class expectedException) {
+ assertNotNull(this.occurredException);
+ assertEquals(expectedException.getName(), this.occurredException.getClass().getName());
+ }
+
+ private void thenReturnedStringEquals(String expectedString) {
+ assertEquals(expectedString, this.returnedString);
+ }
+
+
+ /*
+ * Utilities
+ */
+
+ @Before
+ public void activateComponent() {
+ givenActivated(getDefaultProperties("test-client", null));
+ }
+
+ private Map getDefaultProperties(String clientId, String username) {
+ Map properties = new HashMap<>();
+ properties.put(SparkplugDataTransportOptions.KEY_SERVER_URIS, "tcp://broker:1883");
+ properties.put(SparkplugDataTransportOptions.KEY_CLIENT_ID, clientId);
+ properties.put(SparkplugDataTransportOptions.KEY_KEEP_ALIVE, 100);
+ properties.put(SparkplugDataTransportOptions.KEY_USERNAME, username);
+ properties.put(SparkplugDataTransportOptions.KEY_PASSWORD, new Password("pass"));
+ properties.put(SparkplugDataTransportOptions.KEY_CONNECTION_TIMEOUT, 100);
+
+ return properties;
+ }
+
+ private class MockClientBuilder {
+
+ private MqttAsyncClient client = mock(MqttAsyncClient.class);
+
+ public MockClientBuilder isConnected(boolean isConnected) {
+ when(this.client.isConnected()).thenReturn(isConnected);
+ return this;
+ }
+
+ public MockClientBuilder asFunctional() throws MqttException {
+ IMqttToken token = mock(IMqttToken.class);
+ IMqttDeliveryToken deliveryToken = mock(IMqttDeliveryToken.class);
+ when(deliveryToken.getMessageId()).thenReturn(0);
+
+ when(this.client.connect(any(MqttConnectOptions.class))).thenReturn(token);
+ when(this.client.disconnect(anyLong())).thenReturn(token);
+ when(this.client.subscribe(anyString(), anyInt())).thenReturn(token);
+ when(this.client.unsubscribe(anyString())).thenReturn(token);
+ when(this.client.publish(anyString(), any(byte[].class), anyInt(), anyBoolean())).thenReturn(deliveryToken);
+ return this;
+ }
+
+ public MockClientBuilder asFailingWithTimeout() throws MqttException {
+ IMqttToken token = mock(IMqttToken.class);
+ doThrow(new MqttException(0)).when(token).waitForCompletion(anyLong());
+
+ when(this.client.connect(any(MqttConnectOptions.class))).thenReturn(token);
+ when(this.client.disconnect(anyLong())).thenReturn(token);
+ when(this.client.subscribe(anyString(), anyInt())).thenReturn(token);
+ when(this.client.unsubscribe(anyString())).thenReturn(token);
+
+ doThrow(new MqttException(0)).when(this.client).publish(anyString(), any(byte[].class), anyInt(),
+ anyBoolean());
+
+ return this;
+ }
+
+ public MockClientBuilder withCurrentServerURI(String currentServerURI) {
+ when(this.client.getCurrentServerURI()).thenReturn(currentServerURI);
+ return this;
+ }
+
+ public MockClientBuilder withServerURI(String serverURI) {
+ when(this.client.getServerURI()).thenReturn(serverURI);
+ return this;
+ }
+
+ public MqttAsyncClient build() {
+ return this.client;
+ }
+
+ }
+
+}
\ No newline at end of file