Skip to content

Commit

Permalink
added missing libraries
Browse files Browse the repository at this point in the history
  • Loading branch information
ck-c8y committed Mar 15, 2024
1 parent c2e9cdf commit 198bd5f
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 39 deletions.
16 changes: 12 additions & 4 deletions dynamic-mapping-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,18 @@
</properties>

<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
</dependency>
<dependency>
<groupId>com.google.dagger</groupId>
<artifactId>dagger</artifactId>
</dependency>
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ public class MQTTCallback implements Consumer<Mqtt3Publish> {
public void accept(Mqtt3Publish mqttMessage) {
ConnectorMessage connectorMessage = new ConnectorMessage();
if(mqttMessage.getPayload().isPresent()) {
connectorMessage.setPayload(mqttMessage.getPayload().get().array());
ByteBuffer byteBuffer = mqttMessage.getPayload().get();
byte[] byteArray = new byte[byteBuffer.remaining()];
byteBuffer.get(byteArray);
connectorMessage.setPayload(byteArray);
}
connectorMessage.setTenant(tenant);
connectorMessage.setSendPayload(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,12 @@ tenant, isConnected(),
boolean useTLS = (Boolean) connectorConfiguration.getProperties().getOrDefault("useTLS", false);
boolean useSelfSignedCertificate = (Boolean) connectorConfiguration.getProperties()
.getOrDefault("useSelfSignedCertificate", false);
String prefix = useTLS ? "ssl://" : "tcp://";

String mqttHost = (String) connectorConfiguration.getProperties().get("mqttHost");
String clientId = (String) connectorConfiguration.getProperties().get("clientId");
int mqttPort = (Integer) connectorConfiguration.getProperties().get("mqttPort");
String user = (String) connectorConfiguration.getProperties().get("user");
String password = (String) connectorConfiguration.getProperties().get("password");
String broker = prefix + mqttHost + ":"
+ mqttPort;
// mqttClient = new MqttClient(broker, MqttClient.generateClientId(), new
// MemoryPersistence());
Mqtt3SimpleAuthBuilder simpleAuthBuilder = Mqtt3SimpleAuth.builder();
Expand All @@ -268,12 +266,23 @@ tenant, isConnected(),
// connOpts.setSocketFactory(sslSocketFactory);
} else if (simpleAuthComplete != null) {
Mqtt3SimpleAuth simpleAuth = simpleAuthComplete.build();
mqttClient = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort)
.identifier(clientId + additionalSubscriptionIdTest)
// .automaticReconnect(MqttClientAutoReconnect.builder()
// .initialDelay(3000, TimeUnit.MILLISECONDS)
// .maxDelay(10000, TimeUnit.MILLISECONDS).build())
.simpleAuth(simpleAuth).buildBlocking();
if (useTLS) {
mqttClient = Mqtt3Client.builder()
.sslWithDefaultConfig()
.serverHost(mqttHost).serverPort(mqttPort)
.identifier(clientId + additionalSubscriptionIdTest)
// .automaticReconnect(MqttClientAutoReconnect.builder()
// .initialDelay(3000, TimeUnit.MILLISECONDS)
// .maxDelay(10000, TimeUnit.MILLISECONDS).build())
.simpleAuth(simpleAuth).buildBlocking();
} else {
mqttClient = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort)
.identifier(clientId + additionalSubscriptionIdTest)
// .automaticReconnect(MqttClientAutoReconnect.builder()
// .initialDelay(3000, TimeUnit.MILLISECONDS)
// .maxDelay(10000, TimeUnit.MILLISECONDS).build())
.simpleAuth(simpleAuth).buildBlocking();
}
} else {
mqttClient = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort)
.identifier(clientId + additionalSubscriptionIdTest)
Expand All @@ -283,10 +292,9 @@ tenant, isConnected(),
.buildBlocking();
}
mqttCallback = new MQTTCallback(dispatcher, tenant, MQTTClient.getConnectorType());
//Registering Callback
Mqtt3AsyncClient mqtt3AsyncClient = mqttClient.toAsync();
mqtt3AsyncClient.publishes(MqttGlobalPublishFilter.ALL, mqttCallback);

// Registering Callback
// Mqtt3AsyncClient mqtt3AsyncClient = mqttClient.toAsync();
// mqtt3AsyncClient.publishes(MqttGlobalPublishFilter.ALL, mqttCallback);
Mqtt3ConnAck ack = mqttClient.connectWith().cleanSession(true).keepAlive(60).send();
if (!ack.getReturnCode().equals(Mqtt3ConnAckReturnCode.SUCCESS)) {
throw new ConnectorException("Tenant " + tenant + " - Error connecting to broker:"
Expand Down Expand Up @@ -422,28 +430,23 @@ public String getConnectorIdent() {

@Override
public void subscribe(String topic, Integer qos) throws ConnectorException {
log.debug("Tenant {} - Subscribing on topic: {}", tenant, topic);
log.info("Tenant {} - Subscribing on topic: {}", tenant, topic);
sendSubscriptionEvents(topic, "Subscribing");
if (qos != null) {
//We don't need to add a handler on subscribe using hive client
//mqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send();
Mqtt3AsyncClient asyncMqttClient = mqttClient.toAsync();
asyncMqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send().thenRun(() -> {
log.debug("Tenant {} - Successfully subscribed on topic: {}", tenant, topic);
}).exceptionally(throwable -> {
log.error("Tenant {} - Failed to subscribe on topic {} with error: ",tenant,topic,throwable.getMessage());
return null;
});
} else {
//We don't need to add a handler on subscribe using hive client
Mqtt3AsyncClient asyncMqttClient = mqttClient.toAsync();
asyncMqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send().thenRun(() -> {
log.debug("Tenant {} - Successfully subscribed on topic: {}", tenant, topic);
}).exceptionally(throwable -> {
log.error("Tenant {} - Failed to subscribe on topic {} with error: ",tenant,topic,throwable.getMessage());
return null;
});
}
if (qos.equals(null))
qos = 0;

// We don't need to add a handler on subscribe using hive client
// mqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send();
Mqtt3AsyncClient asyncMqttClient = mqttClient.toAsync();
asyncMqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).callback(mqttCallback).send()
.thenRun(() -> {
log.debug("Tenant {} - Successfully subscribed on topic: {}", tenant, topic);
}).exceptionally(throwable -> {
log.error("Tenant {} - Failed to subscribe on topic {} with error: ", tenant, topic,
throwable.getMessage());
return null;
});

}

public void unsubscribe(String topic) throws Exception {
Expand Down
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.dagger</groupId>
<artifactId>dagger</artifactId>
<version>2.51</version>
</dependency>
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<version>2.1.2</version>
</dependency>

<dependency>
<groupId>com.jayway.jsonpath</groupId>
Expand Down

0 comments on commit 198bd5f

Please sign in to comment.