Skip to content

Commit

Permalink
feat(sparkplug): Edge Node implementation at Data Transport level (#5098
Browse files Browse the repository at this point in the history
)

* feat(sparkplug): Edge Node implementation at DataTransport level

Signed-off-by: Marcello Martina <[email protected]>

* fix: message handler threading rework

Signed-off-by: Marcello Martina <[email protected]>

* chrore: changed executor type

Signed-off-by: Marcello Martina <[email protected]>

* fix: proper thread shutdown on update

Signed-off-by: Marcello Martina <[email protected]>

* feat: added example device for testing purposes

Signed-off-by: Marcello Martina <[email protected]>

* feat: added SessionStatus for managing state transitions

Signed-off-by: Marcello Martina <[email protected]>

* refactor: renaming

Signed-off-by: Marcello Martina <[email protected]>

* refactor: renaming

Signed-off-by: Marcello Martina <[email protected]>

* refactor: better SessionStatus management

Signed-off-by: Marcello Martina <[email protected]>

* fix: made BdSeqCounter thread safe

Signed-off-by: Marcello Martina <[email protected]>

* refactor: replaced messages queue with ExecutorService submits

Signed-off-by: Marcello Martina <[email protected]>

* refactor: better error handling on connect

Signed-off-by: Marcello Martina <[email protected]>

* fix: confirming session outside the client

Signed-off-by: Marcello Martina <[email protected]>

* refactor: simplified BdSeqCounter

Signed-off-by: Marcello Martina <[email protected]>

* refactor: better options class, added logs, moved session estabilish into client

Signed-off-by: Marcello Martina <[email protected]>

* test: added test cases and refactor of SparkplugDataTransportOptionsTest

Signed-off-by: Marcello Martina <[email protected]>

* refactor: removed unecessary reference to DataTransportService in CloudEndpoint

Signed-off-by: Marcello Martina <[email protected]>

* test: fixed CloudEndpoint tests

Signed-off-by: Marcello Martina <[email protected]>

* test: replaced unit test with integration tests for SparkplugDataTransportTest

Signed-off-by: Marcello Martina <[email protected]>

* test: added test cases, removed negative test verification due to spurious messages

Signed-off-by: Marcello Martina <[email protected]>

* test: added test cases

Signed-off-by: Marcello Martina <[email protected]>

* test: added test cases

Signed-off-by: Marcello Martina <[email protected]>

* fix: using Random to generate int values

Signed-off-by: Marcello Martina <[email protected]>

---------

Signed-off-by: Marcello Martina <[email protected]>
  • Loading branch information
marcellorinaldo authored Jan 22, 2024
1 parent 0445dee commit 3488d02
Show file tree
Hide file tree
Showing 17 changed files with 1,350 additions and 962 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2023 Eurotech and/or its affiliates and others
Copyright (c) 2023, 2024 Eurotech and/or its affiliates and others
This program and the accompanying materials are made
available under the terms of the Eclipse Public License 2.0
Expand Down Expand Up @@ -42,12 +42,6 @@
cardinality="1..1"
bind="setEventAdmin"/>

<reference interface="org.eclipse.kura.data.DataTransportService"
name="DataTransportService"
policy="static"
cardinality="1..1"
bind="setDataTransportService"/>

<property name="kura.ui.service.hide" type="Boolean" value="true"/>
<property name="kura.ui.factory.hide" type="Boolean" value="true"/>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2023 Eurotech and/or its affiliates and others
Copyright (c) 2023, 2024 Eurotech and/or its affiliates and others
This program and the accompanying materials are made
available under the terms of the Eclipse Public License 2.0
Expand All @@ -20,19 +20,41 @@
description="Data Transport layer configuration.">

<Icon resource="MqttDataTransport" size="32"/>

<AD id="group.id"
name="Sparkplug Group ID"
type="String"
cardinality="0"
required="true"
default="group"
description="Sparkplug Group identifier to which this Sparkplug Edge Node belongs."/>

<AD id="node.id"
name="Sparkplug Edge Node ID"
type="String"
cardinality="0"
required="true"
default="node"
description="Sparkplug Edge Node identifier to use for this Cloud Connection."/>

<AD id="primary.host.application.id"
name="Sparkplug Primary Host Application ID"
type="String"
cardinality="0"
required="false"
default=""
description="Sparkplug Primary Host Application to associate with this Sparkplug Edge Node."/>

<AD id="server.uris"
name="Server URIs"
type="String"
cardinality="0"
required="true"
default="tcp://broker1-url:1883"
description="List of space-separated URLs of the MQTT brokers to connect to.
When an attempt to connect is initiated the client will start with the
first server in the list and work through the list until a connection
is established with a server. If a connection cannot be made to any of
the servers then the connect attempt fails.
Supported types of connection are tcp: and ssl:."/>
description="List of space-separated URIs of the MQTT brokers to connect to.
Supported types of connection are tcp: and ssl:. URIs must not end with /.
If a primary.host.application.id has been set, the client will cycle
over the list until a Primary Host Application becomes online."/>

<AD id="client.id"
name="Client ID"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2023 Eurotech and/or its affiliates and others
* Copyright (c) 2023, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand Down Expand Up @@ -28,13 +28,15 @@
import org.eclipse.kura.cloudconnection.listener.CloudConnectionListener;
import org.eclipse.kura.cloudconnection.listener.CloudDeliveryListener;
import org.eclipse.kura.cloudconnection.message.KuraMessage;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport.SparkplugDataTransport;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.message.SparkplugPayloads;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.message.SparkplugTopics;
import org.eclipse.kura.cloudconnection.subscriber.listener.CloudSubscriberListener;
import org.eclipse.kura.configuration.ConfigurableComponent;
import org.eclipse.kura.configuration.ConfigurationService;
import org.eclipse.kura.data.DataService;
import org.eclipse.kura.data.DataTransportService;
import org.eclipse.kura.data.listener.DataServiceListener;
import org.eclipse.kura.type.StringValue;
import org.eclipse.kura.type.TypedValue;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
Expand All @@ -55,7 +57,6 @@ public class SparkplugCloudEndpoint

private DataService dataService;
private EventAdmin eventAdmin;
private SparkplugDataTransport dataTransport;

public void setDataService(DataService dataService) {
this.dataService = dataService;
Expand All @@ -65,14 +66,6 @@ public void setEventAdmin(EventAdmin eventAdmin) {
this.eventAdmin = eventAdmin;
}

public void setDataTransportService(DataTransportService dataTransport) {
if (dataTransport instanceof SparkplugDataTransport) {
this.dataTransport = (SparkplugDataTransport) dataTransport;
} else {
throw new IllegalStateException("The bound DataTransport reference is not a SparkplugDataTransport");
}
}

public void activate(Map<String, Object> properties) {
this.kuraServicePid = (String) properties.get(ConfigurationService.KURA_SERVICE_PID);
logger.info("{} - Activating", this.kuraServicePid);
Expand Down Expand Up @@ -172,12 +165,32 @@ public void unregisterCloudConnectionListener(CloudConnectionListener cloudConne
@Override
public void onConnectionEstablished() {
logger.debug("{} - Connection estabilished", this.kuraServicePid);

sendExampleDeviceBirth();

this.cloudConnectionListeners.forEach(listener -> callSafely(listener::onConnectionEstablished));
postConnectionChangeEvent(true);

// TO DO: init subscriptions
}

private void sendExampleDeviceBirth() {
try {
this.dataService.subscribe(SparkplugTopics.getDeviceCommandTopic("g1", "n1", "d1"), 1);

Map<String, TypedValue<?>> metrics = new HashMap<>();
TypedValue<String> value = new StringValue("test.value");
metrics.put("test.key", value);

String topic = SparkplugTopics.getDeviceBirthTopic("g1", "n1", "d1");
byte[] payload = SparkplugPayloads.getDeviceBirthPayload(1, metrics);

this.dataService.publish(topic, payload, 0, false, 7);
} catch (KuraException e) {
logger.error("Error in example", e);
}
}

@Override
public void onDisconnecting() {
// nothing to do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2023 Eurotech and/or its affiliates and others
* Copyright (c) 2023, 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -13,12 +13,20 @@
package org.eclipse.kura.cloudconnection.sparkplug.mqtt.message;

import java.util.Date;
import java.util.Map;
import java.util.Map.Entry;

import org.eclipse.kura.type.TypedValue;
import org.eclipse.tahu.protobuf.SparkplugBProto.DataType;
import org.eclipse.tahu.protobuf.SparkplugBProto.Payload;
import org.eclipse.tahu.protobuf.SparkplugBProto.Payload.Metric;

import com.google.protobuf.InvalidProtocolBufferException;

public class SparkplugPayloads {

public static final String NODE_CONTROL_REBIRTH_METRIC_NAME = "Node Control/Rebirth";

private SparkplugPayloads() {
}

Expand All @@ -29,25 +37,39 @@ public static byte[] getNodeDeathPayload(long bdSeq) {
public static byte[] getNodeBirthPayload(long bdSeq, long seq) {
long timestamp = new Date().getTime();

Payload.Builder protoMsg = Payload.newBuilder();
SparkplugBProtobufPayloadBuilder payloadBuilder = new SparkplugBProtobufPayloadBuilder();
payloadBuilder.withBdSeq(bdSeq, timestamp);
payloadBuilder.withMetric(NODE_CONTROL_REBIRTH_METRIC_NAME, false, DataType.Boolean, timestamp);
payloadBuilder.withSeq(seq);
payloadBuilder.withTimestamp(timestamp);

Payload.Metric.Builder bdSeqMetric = Payload.Metric.newBuilder();
bdSeqMetric.setName("bdSeq");
bdSeqMetric.setLongValue(bdSeq);
bdSeqMetric.setDatatype(DataType.Int64.getNumber());
bdSeqMetric.setTimestamp(timestamp);
protoMsg.addMetrics(bdSeqMetric.build());
return payloadBuilder.build();
}

Payload.Metric.Builder rebirthMetric = Payload.Metric.newBuilder();
rebirthMetric.setName("Node Control/Rebirth");
rebirthMetric.setBooleanValue(false);
rebirthMetric.setDatatype(DataType.Boolean_VALUE);
protoMsg.addMetrics(rebirthMetric.build());
public static byte[] getDeviceBirthPayload(long seq, Map<String, TypedValue<?>> metrics) {
long timestamp = new Date().getTime();

protoMsg.setSeq(seq);
protoMsg.setTimestamp(timestamp);
SparkplugBProtobufPayloadBuilder payloadBuilder = new SparkplugBProtobufPayloadBuilder();
for (Entry<String, TypedValue<?>> metricEntry : metrics.entrySet()) {
payloadBuilder.withMetric(metricEntry.getKey(), metricEntry.getValue(), timestamp);
}

payloadBuilder.withSeq(seq);
payloadBuilder.withTimestamp(timestamp);

return payloadBuilder.build();
}

return protoMsg.build().toByteArray();
public static boolean getBooleanMetric(String metricName, byte[] rawPayload)
throws InvalidProtocolBufferException, NoSuchFieldException {
Payload payload = Payload.parseFrom(rawPayload);
for (Metric metric : payload.getMetricsList()) {
if (metric.getName().equals(metricName)) {
return metric.getBooleanValue();
}
}

throw new NoSuchFieldException("Metric " + metricName + " not found in payload");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*******************************************************************************
* Copyright (c) 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech
*******************************************************************************/
package org.eclipse.kura.cloudconnection.sparkplug.mqtt.transport;

public class BdSeqCounter {

private int bdSeq = 0;

public synchronized void next() {
if (this.bdSeq == 255) {
this.bdSeq = 0;
} else {
this.bdSeq = this.bdSeq + 1;
}
}

public synchronized int getCurrent() {
return this.bdSeq;
}

}
Loading

0 comments on commit 3488d02

Please sign in to comment.