diff --git a/dynamic-mapping-service/pom.xml b/dynamic-mapping-service/pom.xml index e9f8d2dd..c56f2d02 100644 --- a/dynamic-mapping-service/pom.xml +++ b/dynamic-mapping-service/pom.xml @@ -38,10 +38,18 @@ - - com.fasterxml.jackson.datatype - jackson-datatype-joda - + + com.google.dagger + dagger + + + org.jctools + jctools-core + + + com.fasterxml.jackson.datatype + jackson-datatype-joda + org.json json diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTCallback.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTCallback.java index c935b29e..102b7f18 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTCallback.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTCallback.java @@ -31,7 +31,10 @@ public class MQTTCallback implements Consumer { public void accept(Mqtt3Publish mqttMessage) { ConnectorMessage connectorMessage = new ConnectorMessage(); if(mqttMessage.getPayload().isPresent()) { - connectorMessage.setPayload(mqttMessage.getPayload().get().array()); + ByteBuffer byteBuffer = mqttMessage.getPayload().get(); + byte[] byteArray = new byte[byteBuffer.remaining()]; + byteBuffer.get(byteArray); + connectorMessage.setPayload(byteArray); } connectorMessage.setTenant(tenant); connectorMessage.setSendPayload(true); diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java index 0a51d05e..d3c208ff 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java @@ -235,14 +235,12 @@ tenant, isConnected(), boolean useTLS = (Boolean) connectorConfiguration.getProperties().getOrDefault("useTLS", false); boolean useSelfSignedCertificate = (Boolean) connectorConfiguration.getProperties() .getOrDefault("useSelfSignedCertificate", false); - String prefix = useTLS ? "ssl://" : "tcp://"; + String mqttHost = (String) connectorConfiguration.getProperties().get("mqttHost"); String clientId = (String) connectorConfiguration.getProperties().get("clientId"); int mqttPort = (Integer) connectorConfiguration.getProperties().get("mqttPort"); String user = (String) connectorConfiguration.getProperties().get("user"); String password = (String) connectorConfiguration.getProperties().get("password"); - String broker = prefix + mqttHost + ":" - + mqttPort; // mqttClient = new MqttClient(broker, MqttClient.generateClientId(), new // MemoryPersistence()); Mqtt3SimpleAuthBuilder simpleAuthBuilder = Mqtt3SimpleAuth.builder(); @@ -268,12 +266,23 @@ tenant, isConnected(), // connOpts.setSocketFactory(sslSocketFactory); } else if (simpleAuthComplete != null) { Mqtt3SimpleAuth simpleAuth = simpleAuthComplete.build(); - mqttClient = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort) - .identifier(clientId + additionalSubscriptionIdTest) - // .automaticReconnect(MqttClientAutoReconnect.builder() - // .initialDelay(3000, TimeUnit.MILLISECONDS) - // .maxDelay(10000, TimeUnit.MILLISECONDS).build()) - .simpleAuth(simpleAuth).buildBlocking(); + if (useTLS) { + mqttClient = Mqtt3Client.builder() + .sslWithDefaultConfig() + .serverHost(mqttHost).serverPort(mqttPort) + .identifier(clientId + additionalSubscriptionIdTest) + // .automaticReconnect(MqttClientAutoReconnect.builder() + // .initialDelay(3000, TimeUnit.MILLISECONDS) + // .maxDelay(10000, TimeUnit.MILLISECONDS).build()) + .simpleAuth(simpleAuth).buildBlocking(); + } else { + mqttClient = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort) + .identifier(clientId + additionalSubscriptionIdTest) + // .automaticReconnect(MqttClientAutoReconnect.builder() + // .initialDelay(3000, TimeUnit.MILLISECONDS) + // .maxDelay(10000, TimeUnit.MILLISECONDS).build()) + .simpleAuth(simpleAuth).buildBlocking(); + } } else { mqttClient = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort) .identifier(clientId + additionalSubscriptionIdTest) @@ -283,10 +292,9 @@ tenant, isConnected(), .buildBlocking(); } mqttCallback = new MQTTCallback(dispatcher, tenant, MQTTClient.getConnectorType()); - //Registering Callback - Mqtt3AsyncClient mqtt3AsyncClient = mqttClient.toAsync(); - mqtt3AsyncClient.publishes(MqttGlobalPublishFilter.ALL, mqttCallback); - + // Registering Callback + // Mqtt3AsyncClient mqtt3AsyncClient = mqttClient.toAsync(); + // mqtt3AsyncClient.publishes(MqttGlobalPublishFilter.ALL, mqttCallback); Mqtt3ConnAck ack = mqttClient.connectWith().cleanSession(true).keepAlive(60).send(); if (!ack.getReturnCode().equals(Mqtt3ConnAckReturnCode.SUCCESS)) { throw new ConnectorException("Tenant " + tenant + " - Error connecting to broker:" @@ -422,28 +430,23 @@ public String getConnectorIdent() { @Override public void subscribe(String topic, Integer qos) throws ConnectorException { - log.debug("Tenant {} - Subscribing on topic: {}", tenant, topic); + log.info("Tenant {} - Subscribing on topic: {}", tenant, topic); sendSubscriptionEvents(topic, "Subscribing"); - if (qos != null) { - //We don't need to add a handler on subscribe using hive client - //mqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send(); - Mqtt3AsyncClient asyncMqttClient = mqttClient.toAsync(); - asyncMqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send().thenRun(() -> { - log.debug("Tenant {} - Successfully subscribed on topic: {}", tenant, topic); - }).exceptionally(throwable -> { - log.error("Tenant {} - Failed to subscribe on topic {} with error: ",tenant,topic,throwable.getMessage()); - return null; - }); - } else { - //We don't need to add a handler on subscribe using hive client - Mqtt3AsyncClient asyncMqttClient = mqttClient.toAsync(); - asyncMqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send().thenRun(() -> { - log.debug("Tenant {} - Successfully subscribed on topic: {}", tenant, topic); - }).exceptionally(throwable -> { - log.error("Tenant {} - Failed to subscribe on topic {} with error: ",tenant,topic,throwable.getMessage()); - return null; - }); - } + if (qos.equals(null)) + qos = 0; + + // We don't need to add a handler on subscribe using hive client + // mqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send(); + Mqtt3AsyncClient asyncMqttClient = mqttClient.toAsync(); + asyncMqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).callback(mqttCallback).send() + .thenRun(() -> { + log.debug("Tenant {} - Successfully subscribed on topic: {}", tenant, topic); + }).exceptionally(throwable -> { + log.error("Tenant {} - Failed to subscribe on topic {} with error: ", tenant, topic, + throwable.getMessage()); + return null; + }); + } public void unsubscribe(String topic) throws Exception { diff --git a/pom.xml b/pom.xml index 81c1cef9..eb83723d 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,16 @@ + + com.google.dagger + dagger + 2.51 + + + org.jctools + jctools-core + 2.1.2 + com.jayway.jsonpath