Skip to content

Commit

Permalink
added listener when connection is lost
Browse files Browse the repository at this point in the history
  • Loading branch information
ck-c8y committed Mar 16, 2024
1 parent 4605cd4 commit 6f5c257
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public void submitInitialize() {
public abstract ConnectorSpecification getSpecification();

public void loadConfiguration() {
connectorConfiguration = connectorConfigurationComponent.getConnectorConfiguration(this.getConnectorIdent(), tenant);
connectorConfiguration = connectorConfigurationComponent.getConnectorConfiguration(this.getConnectorIdent(),
tenant);
// get the latest serviceConfiguration from the Cumulocity backend in case
// someone changed it in the meantime
// update the in the registry
Expand Down Expand Up @@ -463,6 +464,18 @@ public void sendSubscriptionEvents(String topic, String action) {
}
}

public void connectionLost(String closeMessage, Throwable closeException) {
String tenant = getTenant();
String connectorIdent = getConnectorIdent();
if (closeException != null)
log.error("Tenant {} - Connection Lost to broker {}: {}", tenant, connectorIdent,
closeException.getMessage());
closeException.printStackTrace();
if (closeMessage != null)
log.info("Tenant {} - Connection Lost to MQTT broker: {}", tenant, closeMessage);
reconnect();
}

@Data
@AllArgsConstructor
public static class Certificate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ public class MQTTCallback implements Consumer<Mqtt3Publish> {
this.connectorIdent = connectorIdent;
}

// @Override
// public void connectionLost(Throwable throwable) {
// genericMessageCallback.onClose(null, throwable);
// }

@Override
public void accept(Mqtt3Publish mqttMessage) {
ConnectorMessage connectorMessage = new ConnectorMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,13 @@
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.validation.constraints.NotNull;

import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe;
import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3UnsubscribeBuilder;
import dynamic.mapping.connector.core.ConnectorPropertyType;
import dynamic.mapping.connector.core.ConnectorSpecification;
import dynamic.mapping.connector.core.client.AConnectorClient;
Expand All @@ -55,32 +48,21 @@
import dynamic.mapping.processor.model.ProcessingContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.springframework.beans.BeanInstantiationException;

import com.hivemq.client.internal.mqtt.message.MqttMessage;
import com.hivemq.client.mqtt.MqttClientSslConfig;
import com.hivemq.client.mqtt.MqttClientSslConfigBuilder;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.lifecycle.MqttClientAutoReconnect;
import com.hivemq.client.mqtt.lifecycle.MqttClientDisconnectedContext;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3Client;
import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientConfig;
import com.hivemq.client.mqtt.mqtt3.message.Mqtt3Message;
import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder;
import com.hivemq.client.mqtt.mqtt3.message.auth.Mqtt3SimpleAuth;
import com.hivemq.client.mqtt.mqtt3.message.auth.Mqtt3SimpleAuthBuilder;
import com.hivemq.client.mqtt.mqtt3.message.auth.Mqtt3SimpleAuthBuilder.Complete;
import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck;
import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAckReturnCode;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscribe;
import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3SubscribeBuilder;
import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3SubscribeBuilderBase;
import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscription;
import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAck;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import dynamic.mapping.configuration.ConnectorConfiguration;
Expand Down Expand Up @@ -247,8 +229,11 @@ tenant, isConnected(),
int mqttPort = (Integer) connectorConfiguration.getProperties().get("mqttPort");
String user = (String) connectorConfiguration.getProperties().get("user");
String password = (String) connectorConfiguration.getProperties().get("password");
// mqttClient = new MqttClient(broker, MqttClient.generateClientId(), new
// MemoryPersistence());

Mqtt3ClientBuilder partialBuilder = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort)
.identifier(clientId + additionalSubscriptionIdTest);

// is username & password used
Mqtt3SimpleAuthBuilder simpleAuthBuilder = Mqtt3SimpleAuth.builder();
Complete simpleAuthComplete = null;
if (!StringUtils.isEmpty(user)) {
Expand All @@ -257,45 +242,40 @@ tenant, isConnected(),
if (!StringUtils.isEmpty(password) && simpleAuthComplete != null) {
simpleAuthComplete = simpleAuthComplete.password(password.getBytes());
}
if (simpleAuthComplete != null) {
Mqtt3SimpleAuth simpleAuth = simpleAuthComplete.build();
partialBuilder = partialBuilder
.simpleAuth(simpleAuth);
}

// tls configuration
if (useSelfSignedCertificate) {
mqttClient = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort)
.identifier(clientId + additionalSubscriptionIdTest).sslConfig(sslConfig)
.buildBlocking();
;
partialBuilder = partialBuilder.sslConfig(sslConfig);
log.debug("Tenant {} - Using certificate: {}", tenant, cert.getCertInPemFormat());
// connOpts.setSocketFactory(sslSocketFactory);
} else if (simpleAuthComplete != null) {
Mqtt3SimpleAuth simpleAuth = simpleAuthComplete.build();
if (useTLS) {
mqttClient = Mqtt3Client.builder()
.sslWithDefaultConfig()
.serverHost(mqttHost).serverPort(mqttPort)
.identifier(clientId + additionalSubscriptionIdTest)
// .automaticReconnect(MqttClientAutoReconnect.builder()
// .initialDelay(3000, TimeUnit.MILLISECONDS)
// .maxDelay(10000, TimeUnit.MILLISECONDS).build())
.simpleAuth(simpleAuth).buildBlocking();
} else {
mqttClient = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort)
.identifier(clientId + additionalSubscriptionIdTest)
// .automaticReconnect(MqttClientAutoReconnect.builder()
// .initialDelay(3000, TimeUnit.MILLISECONDS)
// .maxDelay(10000, TimeUnit.MILLISECONDS).build())
.simpleAuth(simpleAuth).buildBlocking();
}
} else {
mqttClient = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort)
.identifier(clientId + additionalSubscriptionIdTest)
// .automaticReconnect(MqttClientAutoReconnect.builder()
// .initialDelay(3000, TimeUnit.MILLISECONDS)
// .maxDelay(10000, TimeUnit.MILLISECONDS).build())
.buildBlocking();
} else if (useTLS) {
partialBuilder = partialBuilder.sslWithDefaultConfig();
}

// finally build mqttClient
MqttClientDisconnectedContext c;
mqttClient = partialBuilder
.addDisconnectedListener(context -> {
// test if we closed the connection deliberately, otherwise we have to try to
// reconnect
if (connectorConfiguration.enabled)
connectionLost(
"Disconnected from: " + context.getSource().toString(), context.getCause());
})
.buildBlocking();

mqttCallback = new MQTTCallback(dispatcher, tenant, MQTTClient.getConnectorType());
// Registering Callback
// Mqtt3AsyncClient mqtt3AsyncClient = mqttClient.toAsync();
// mqtt3AsyncClient.publishes(MqttGlobalPublishFilter.ALL, mqttCallback);
Mqtt3ConnAck ack = mqttClient.connectWith().cleanSession(true).keepAlive(60).send();
Mqtt3ConnAck ack = mqttClient.connectWith()
.cleanSession(true)
.keepAlive(60)
.send();
if (!ack.getReturnCode().equals(Mqtt3ConnAckReturnCode.SUCCESS)) {
throw new ConnectorException("Tenant " + tenant + " - Error connecting to broker:"
+ mqttClient.getConfig().getServerHost() + ". Errorcode: "
Expand Down Expand Up @@ -361,7 +341,6 @@ private void updateConnectorStatusToFailed(Exception e) {

@Override
public void close() {

}

@Override
Expand Down Expand Up @@ -469,4 +448,5 @@ public void publishMEAO(ProcessingContext<?> context) {
public String getConnectorName() {
return connectorName;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -237,15 +237,6 @@ public Future<List<ProcessingContext<?>>> processMessage(ConnectorMessage messag

@Override
public void onClose(String closeMessage, Throwable closeException) {
String tenant = connectorClient.getTenant();
String connectorIdent = connectorClient.getConnectorIdent();
if (closeException != null)
log.error("Tenant {} - Connection Lost to broker {}: {}", tenant, connectorIdent,
closeException.getMessage());
closeException.printStackTrace();
if (closeMessage != null)
log.info("Tenant {} - Connection Lost to MQTT broker: {}", tenant, closeMessage);
connectorClient.reconnect();
}

@Override
Expand Down

0 comments on commit 6f5c257

Please sign in to comment.