Skip to content

Commit

Permalink
feat(sparkplug): added DataTransportLayer
Browse files Browse the repository at this point in the history
Signed-off-by: Marcello Martina <[email protected]>
  • Loading branch information
marcellorinaldo committed Dec 22, 2023
1 parent 4b06742 commit 5996e34
Show file tree
Hide file tree
Showing 11 changed files with 1,469 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
cardinality="1..1"
bind="setEventAdmin"/>

<reference interface="org.eclipse.kura.data.DataTransportService"
name="DataTransportService"
policy="static"
cardinality="1..1"
bind="setDataTransportService"/>

<property name="kura.ui.service.hide" type="Boolean" value="true"/>
<property name="kura.ui.factory.hide" type="Boolean" value="true"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<provide interface="org.eclipse.kura.data.DataTransportService"/>
<provide interface="org.eclipse.kura.configuration.ConfigurableComponent"/>
</service>

<reference name="DataTransportListener"
policy="dynamic"
cardinality="0..n"
interface="org.eclipse.kura.data.DataTransportListener"/>

<property name="kura.ui.service.hide" type="Boolean" value="true"/>
<property name="kura.ui.factory.hide" type="Boolean" value="true"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,61 @@
name="SparkplugDataTransport"
description="Data Transport layer configuration.">

<Icon resource="MqttDataTransport" size="32"/>

<AD id="server.uris"
name="Server URIs"
type="String"
cardinality="0"
required="true"
default="tcp://broker1-url:1883"
description="List of space-separated URLs of the MQTT brokers to connect to.
When an attempt to connect is initiated the client will start with the
first server in the list and work through the list until a connection
is established with a server. If a connection cannot be made to any of
the servers then the connect attempt fails.
Supported types of connection are tcp: and ssl:."/>

<AD id="client.id"
name="Client ID"
type="String"
cardinality="0"
required="true"
default="client"
description="Client identifier to be used when connecting to the MQTT broker."/>

<AD id="username"
name="Username"
type="String"
cardinality="0"
required="false"
default=""
description="Username to be used when connecting to the MQTT broker."/>

<AD id="password"
name="Password"
type="Password"
cardinality="0"
required="false"
default=""
description="Password to be used when connecting to the MQTT broker."/>

<AD id="keep.alive"
name="Keep Alive Interval"
type="Integer"
cardinality="0"
required="true"
default="60"
description="Frequency in seconds for the periodic MQTT PING message."/>

<AD id="connection.timeout"
name="Connection Timeout"
type="Integer"
cardinality="0"
required="true"
default="30"
description="Timeout used for all interactions with the MQTT broker."/>

</OCD>

<Designate pid="org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransport"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import org.eclipse.kura.cloudconnection.listener.CloudConnectionListener;
import org.eclipse.kura.cloudconnection.listener.CloudDeliveryListener;
import org.eclipse.kura.cloudconnection.message.KuraMessage;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransport;
import org.eclipse.kura.cloudconnection.subscriber.listener.CloudSubscriberListener;
import org.eclipse.kura.configuration.ConfigurableComponent;
import org.eclipse.kura.configuration.ConfigurationService;
import org.eclipse.kura.data.DataService;
import org.eclipse.kura.data.DataTransportService;
import org.eclipse.kura.data.listener.DataServiceListener;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
Expand All @@ -53,6 +55,7 @@ public class SparkplugCloudEndpoint

private DataService dataService;
private EventAdmin eventAdmin;
private SparkplugDataTransport dataTransport;

public void setDataService(DataService dataService) {
this.dataService = dataService;
Expand All @@ -62,6 +65,14 @@ public void setEventAdmin(EventAdmin eventAdmin) {
this.eventAdmin = eventAdmin;
}

public void setDataTransportService(DataTransportService dataTransport) {
if (dataTransport instanceof SparkplugDataTransport) {
this.dataTransport = (SparkplugDataTransport) dataTransport;
} else {
throw new IllegalStateException("The bound DataTransport reference is not a SparkplugDataTransport");
}
}

public void activate(Map<String, Object> properties) {
this.kuraServicePid = (String) properties.get(ConfigurationService.KURA_SERVICE_PID);
logger.info("{} - Activating", this.kuraServicePid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public void createConfiguration(String pid) throws KuraException {
cloudEndpointProperties.put(DATA_SERVICE_REFERENCE_NAME,
String.format(REFERENCE_TARGET_VALUE_FORMAT, dataServicePid));
cloudEndpointProperties.put(KURA_CLOUD_CONNECTION_FACTORY_PID, FACTORY_PID);
cloudEndpointProperties.put(DATA_TRANSPORT_SERVICE_REFERENCE_NAME,
String.format(REFERENCE_TARGET_VALUE_FORMAT, dataTransportServicePid));

this.configurationService.createFactoryConfiguration(CLOUD_ENDPOINT_FACTORY_PID, pid, cloudEndpointProperties,
false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*******************************************************************************
* Copyright (c) 2023 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech
*******************************************************************************/
package org.eclipse.kura.cloudconnection.sparkplug.mqtt.message;

import java.util.Date;

import org.eclipse.tahu.protobuf.SparkplugBProto.DataType;
import org.eclipse.tahu.protobuf.SparkplugBProto.Payload;

public class SparkplugPayloads {

private SparkplugPayloads() {
}

public static byte[] getNodeDeathPayload(long bdSeq) {
return new SparkplugBProtobufPayloadBuilder().withBdSeq(bdSeq, new Date().getTime()).build();
}

public static byte[] getNodeBirthPayload(long bdSeq, long seq) {
long timestamp = new Date().getTime();

Payload.Builder protoMsg = Payload.newBuilder();

Payload.Metric.Builder bdSeqMetric = Payload.Metric.newBuilder();
bdSeqMetric.setName("bdSeq");
bdSeqMetric.setLongValue(bdSeq);
bdSeqMetric.setDatatype(DataType.Int64.getNumber());
bdSeqMetric.setTimestamp(timestamp);
protoMsg.addMetrics(bdSeqMetric.build());

Payload.Metric.Builder rebirthMetric = Payload.Metric.newBuilder();
rebirthMetric.setName("Node Control/Rebirth");
rebirthMetric.setBooleanValue(false);
rebirthMetric.setDatatype(DataType.Boolean_VALUE);
protoMsg.addMetrics(rebirthMetric.build());

protoMsg.setSeq(seq);
protoMsg.setTimestamp(timestamp);

return protoMsg.build().toByteArray();
}

}
Loading

0 comments on commit 5996e34

Please sign in to comment.