Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ck-c8y committed Mar 15, 2024
1 parent 6a5f631 commit 4b9939c
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 77 deletions.
9 changes: 7 additions & 2 deletions dynamic-mapping-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,14 @@
<artifactId>lombok</artifactId>
<scope>compile</scope>
</dependency>
<!-- <dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<scope>test</scope>
</dependency> -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
12 changes: 8 additions & 4 deletions dynamic-mapping-interface/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
<!-- <dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency> -->
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.jsonata4java</groupId>
<artifactId>JSONata4Java</artifactId>
Expand Down
6 changes: 5 additions & 1 deletion dynamic-mapping-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<!-- <dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency> -->
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.jsonata4java</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public interface GenericMessageCallback {
void onClose(String closeMessage, Throwable closeException);

void onMessage(ConnectorMessage message) throws Exception;
void onMessage(ConnectorMessage message);

void onError( Throwable errorException);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import dynamic.mapping.processor.inbound.AsynchronousDispatcherInbound;

import org.apache.commons.lang3.mutable.MutableInt;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.joda.time.DateTime;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -220,7 +219,7 @@ public boolean shouldConnect() {
/***
* Subscribe to a topic on the Broker
***/
public abstract void subscribe(String topic, Integer qos) throws MqttException;
public abstract void subscribe(String topic, Integer qos) throws ConnectorException;

/***
* Unsubscribe a topic on the Broker
Expand Down Expand Up @@ -341,7 +340,7 @@ public void upsertActiveSubscription(Mapping mapping) {
mapping.qos.ordinal());
try {
subscribe(mapping.subscriptionTopic, mapping.qos.ordinal());
} catch (MqttException exp) {
} catch (ConnectorException exp) {
log.error("Tenant {} - Exception when subscribing to topic: {}: ", tenant,
mapping.subscriptionTopic, exp);
}
Expand All @@ -363,7 +362,7 @@ public void upsertActiveSubscription(Mapping mapping) {
mapping.qos.ordinal());
try {
subscribe(mapping.subscriptionTopic, mapping.qos.ordinal());
} catch (MqttException exp) {
} catch (ConnectorException exp) {
log.error("Tenant {} - Exception when subscribing to topic: {}: ", tenant,
mapping.subscriptionTopic, exp);
}
Expand Down Expand Up @@ -407,7 +406,7 @@ public void updateActiveSubscriptions(List<Mapping> updatedMappings, boolean res
log.debug("Tenant {} - Subscribing to topic: {}, qos: {}", tenant, topic, qos);
try {
subscribe(topic, qos);
} catch (MqttException exp) {
} catch (ConnectorException exp) {
log.error("Tenant {} - Exception when subscribing to topic: {}: ", tenant, topic, exp);
throw new RuntimeException(exp);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2022 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA,
* and/or its subsidiaries and/or its affiliates and/or their licensors.
*
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @authors Christof Strack, Stefan Witschel
*/

package dynamic.mapping.connector.core.client;

public class ConnectorException extends Exception {
public ConnectorException(String string) {
super(string);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package dynamic.mapping.connector.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.function.Consumer;

import com.hivemq.client.mqtt.datatypes.MqttTopic;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;

import dynamic.mapping.connector.core.callback.ConnectorMessage;
import dynamic.mapping.connector.core.callback.GenericMessageCallback;
public class MQTTCallback implements MqttCallback {

public class MQTTCallback implements Consumer<Mqtt3Publish> {
GenericMessageCallback genericMessageCallback;
static String TOPIC_LEVEL_SEPARATOR = String.valueOf(MqttTopic.TOPIC_LEVEL_SEPARATOR);
String tenant;
String connectorIdent;

Expand All @@ -17,24 +22,27 @@ public class MQTTCallback implements MqttCallback {
this.connectorIdent = connectorIdent;
}

@Override
public void connectionLost(Throwable throwable) {
genericMessageCallback.onClose(null, throwable);
}
// @Override
// public void connectionLost(Throwable throwable) {
// genericMessageCallback.onClose(null, throwable);
// }

@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
public void accept(Mqtt3Publish mqttMessage) {
ConnectorMessage connectorMessage = new ConnectorMessage();
connectorMessage.setPayload(mqttMessage.getPayload());
Optional<ByteBuffer> ob = mqttMessage.getPayload();
if (ob.isPresent()) {
ByteBuffer bb = ob.get();
byte[] arr = new byte[bb.remaining()];
bb.get(arr);
connectorMessage.setPayload(arr);
}
connectorMessage.setTenant(tenant);
connectorMessage.setSendPayload(true);
connectorMessage.setSendPayload(true);
String topic = String.join(TOPIC_LEVEL_SEPARATOR, mqttMessage.getTopic().getLevels());
connectorMessage.setTopic(topic);
connectorMessage.setConnectorIdent(connectorIdent);
genericMessageCallback.onMessage(connectorMessage);
}

@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

}
}
Loading

0 comments on commit 4b9939c

Please sign in to comment.