diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/core/client/AConnectorClient.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/core/client/AConnectorClient.java index a4d95ff3..07e0e6b4 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/core/client/AConnectorClient.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/core/client/AConnectorClient.java @@ -137,7 +137,8 @@ public void submitInitialize() { public abstract ConnectorSpecification getSpecification(); public void loadConfiguration() { - connectorConfiguration = connectorConfigurationComponent.getConnectorConfiguration(this.getConnectorIdent(), tenant); + connectorConfiguration = connectorConfigurationComponent.getConnectorConfiguration(this.getConnectorIdent(), + tenant); // get the latest serviceConfiguration from the Cumulocity backend in case // someone changed it in the meantime // update the in the registry @@ -463,6 +464,18 @@ public void sendSubscriptionEvents(String topic, String action) { } } + public void connectionLost(String closeMessage, Throwable closeException) { + String tenant = getTenant(); + String connectorIdent = getConnectorIdent(); + if (closeException != null) + log.error("Tenant {} - Connection Lost to broker {}: {}", tenant, connectorIdent, + closeException.getMessage()); + closeException.printStackTrace(); + if (closeMessage != null) + log.info("Tenant {} - Connection Lost to MQTT broker: {}", tenant, closeMessage); + reconnect(); + } + @Data @AllArgsConstructor public static class Certificate { diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTCallback.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTCallback.java index d6346adb..7d7bb3e0 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTCallback.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTCallback.java @@ -21,11 +21,6 @@ public class MQTTCallback implements Consumer { this.connectorIdent = connectorIdent; } - // @Override - // public void connectionLost(Throwable throwable) { - // genericMessageCallback.onClose(null, throwable); - // } - @Override public void accept(Mqtt3Publish mqttMessage) { ConnectorMessage connectorMessage = new ConnectorMessage(); diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java index b0f08c68..1b29c6f0 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java @@ -31,20 +31,13 @@ import java.security.cert.CertificateException; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; +import java.time.LocalTime; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; -import javax.validation.constraints.NotNull; - import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe; -import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3UnsubscribeBuilder; import dynamic.mapping.connector.core.ConnectorPropertyType; import dynamic.mapping.connector.core.ConnectorSpecification; import dynamic.mapping.connector.core.client.AConnectorClient; @@ -55,32 +48,21 @@ import dynamic.mapping.processor.model.ProcessingContext; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableInt; -import org.springframework.beans.BeanInstantiationException; -import com.hivemq.client.internal.mqtt.message.MqttMessage; import com.hivemq.client.mqtt.MqttClientSslConfig; import com.hivemq.client.mqtt.MqttClientSslConfigBuilder; -import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; -import com.hivemq.client.mqtt.lifecycle.MqttClientAutoReconnect; +import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext; import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient; import com.hivemq.client.mqtt.mqtt3.Mqtt3Client; -import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientConfig; -import com.hivemq.client.mqtt.mqtt3.message.Mqtt3Message; +import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder; import com.hivemq.client.mqtt.mqtt3.message.auth.Mqtt3SimpleAuth; import com.hivemq.client.mqtt.mqtt3.message.auth.Mqtt3SimpleAuthBuilder; import com.hivemq.client.mqtt.mqtt3.message.auth.Mqtt3SimpleAuthBuilder.Complete; import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck; import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAckReturnCode; import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; -import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscribe; -import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3SubscribeBuilder; -import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3SubscribeBuilderBase; -import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscription; -import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAck; -import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; import dynamic.mapping.configuration.ConnectorConfiguration; @@ -247,8 +229,11 @@ tenant, isConnected(), int mqttPort = (Integer) connectorConfiguration.getProperties().get("mqttPort"); String user = (String) connectorConfiguration.getProperties().get("user"); String password = (String) connectorConfiguration.getProperties().get("password"); - // mqttClient = new MqttClient(broker, MqttClient.generateClientId(), new - // MemoryPersistence()); + + Mqtt3ClientBuilder partialBuilder = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort) + .identifier(clientId + additionalSubscriptionIdTest); + + // is username & password used Mqtt3SimpleAuthBuilder simpleAuthBuilder = Mqtt3SimpleAuth.builder(); Complete simpleAuthComplete = null; if (!StringUtils.isEmpty(user)) { @@ -257,45 +242,40 @@ tenant, isConnected(), if (!StringUtils.isEmpty(password) && simpleAuthComplete != null) { simpleAuthComplete = simpleAuthComplete.password(password.getBytes()); } + if (simpleAuthComplete != null) { + Mqtt3SimpleAuth simpleAuth = simpleAuthComplete.build(); + partialBuilder = partialBuilder + .simpleAuth(simpleAuth); + } + + // tls configuration if (useSelfSignedCertificate) { - mqttClient = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort) - .identifier(clientId + additionalSubscriptionIdTest).sslConfig(sslConfig) - .buildBlocking(); - ; + partialBuilder = partialBuilder.sslConfig(sslConfig); log.debug("Tenant {} - Using certificate: {}", tenant, cert.getCertInPemFormat()); - // connOpts.setSocketFactory(sslSocketFactory); - } else if (simpleAuthComplete != null) { - Mqtt3SimpleAuth simpleAuth = simpleAuthComplete.build(); - if (useTLS) { - mqttClient = Mqtt3Client.builder() - .sslWithDefaultConfig() - .serverHost(mqttHost).serverPort(mqttPort) - .identifier(clientId + additionalSubscriptionIdTest) - // .automaticReconnect(MqttClientAutoReconnect.builder() - // .initialDelay(3000, TimeUnit.MILLISECONDS) - // .maxDelay(10000, TimeUnit.MILLISECONDS).build()) - .simpleAuth(simpleAuth).buildBlocking(); - } else { - mqttClient = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort) - .identifier(clientId + additionalSubscriptionIdTest) - // .automaticReconnect(MqttClientAutoReconnect.builder() - // .initialDelay(3000, TimeUnit.MILLISECONDS) - // .maxDelay(10000, TimeUnit.MILLISECONDS).build()) - .simpleAuth(simpleAuth).buildBlocking(); - } - } else { - mqttClient = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort) - .identifier(clientId + additionalSubscriptionIdTest) - // .automaticReconnect(MqttClientAutoReconnect.builder() - // .initialDelay(3000, TimeUnit.MILLISECONDS) - // .maxDelay(10000, TimeUnit.MILLISECONDS).build()) - .buildBlocking(); + } else if (useTLS) { + partialBuilder = partialBuilder.sslWithDefaultConfig(); } + + // finally build mqttClient + MqttClientDisconnectedContext c; + mqttClient = partialBuilder + .addDisconnectedListener(context -> { + // test if we closed the connection deliberately, otherwise we have to try to + // reconnect + if (connectorConfiguration.enabled) + connectionLost( + "Disconnected from: " + context.getSource().toString(), context.getCause()); + }) + .buildBlocking(); + mqttCallback = new MQTTCallback(dispatcher, tenant, MQTTClient.getConnectorType()); // Registering Callback // Mqtt3AsyncClient mqtt3AsyncClient = mqttClient.toAsync(); // mqtt3AsyncClient.publishes(MqttGlobalPublishFilter.ALL, mqttCallback); - Mqtt3ConnAck ack = mqttClient.connectWith().cleanSession(true).keepAlive(60).send(); + Mqtt3ConnAck ack = mqttClient.connectWith() + .cleanSession(true) + .keepAlive(60) + .send(); if (!ack.getReturnCode().equals(Mqtt3ConnAckReturnCode.SUCCESS)) { throw new ConnectorException("Tenant " + tenant + " - Error connecting to broker:" + mqttClient.getConfig().getServerHost() + ". Errorcode: " @@ -361,7 +341,6 @@ private void updateConnectorStatusToFailed(Exception e) { @Override public void close() { - } @Override @@ -469,4 +448,5 @@ public void publishMEAO(ProcessingContext context) { public String getConnectorName() { return connectorName; } + } \ No newline at end of file diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/processor/inbound/AsynchronousDispatcherInbound.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/processor/inbound/AsynchronousDispatcherInbound.java index c4e793b3..0d9977df 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/processor/inbound/AsynchronousDispatcherInbound.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/processor/inbound/AsynchronousDispatcherInbound.java @@ -237,15 +237,6 @@ public Future>> processMessage(ConnectorMessage messag @Override public void onClose(String closeMessage, Throwable closeException) { - String tenant = connectorClient.getTenant(); - String connectorIdent = connectorClient.getConnectorIdent(); - if (closeException != null) - log.error("Tenant {} - Connection Lost to broker {}: {}", tenant, connectorIdent, - closeException.getMessage()); - closeException.printStackTrace(); - if (closeMessage != null) - log.info("Tenant {} - Connection Lost to MQTT broker: {}", tenant, closeMessage); - connectorClient.reconnect(); } @Override