Skip to content

Commit

Permalink
migrated the two test clients to hiveMQ lib
Browse files Browse the repository at this point in the history
  • Loading branch information
ck-c8y committed Mar 15, 2024
1 parent 09c28e2 commit f5dbde4
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,69 +21,64 @@

package dynamic.mapping;

import com.hivemq.client.internal.mqtt.message.MqttMessage;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3Client;
import com.hivemq.client.mqtt.mqtt3.message.auth.Mqtt3SimpleAuth;

import dynamic.mapping.processor.extension.external.CustomEventOuter;
import dynamic.mapping.processor.extension.external.CustomEventOuter.CustomEvent;


public class ProtobufPahoClient {
Mqtt3BlockingClient testClient;
static String broker_host = System.getenv("broker");
static Integer broker_port = Integer.valueOf(System.getenv("broker_port"));
static String client_id = System.getenv("client_id");
static String broker_username = System.getenv("broker_username");
static String broker_password = System.getenv("broker_password");

public ProtobufPahoClient(Mqtt3BlockingClient sampleClient) {
testClient = sampleClient;
}

public static void main(String[] args) {

ProtobufPahoClient client = new ProtobufPahoClient();
Mqtt3SimpleAuth simpleAuth = Mqtt3SimpleAuth.builder().username(broker_username)
.password(broker_password.getBytes()).build();
Mqtt3BlockingClient sampleClient = Mqtt3Client.builder()
.serverHost(broker_host)
.serverPort(broker_port)
.identifier(client_id)
.simpleAuth(simpleAuth)
.sslWithDefaultConfig()
.buildBlocking();
ProtobufPahoClient client = new ProtobufPahoClient(sampleClient);
client.testSendEvent();
}

private void testSendEvent() {
int qos = 0;
String broker = System.getenv("broker");
String client_id = System.getenv("client_id");
String broker_username = System.getenv("broker_username");
String broker_password = System.getenv("broker_password");
String topic2 = "protobuf/event";

try {
Mqtt3BlockingClient sampleClient = Mqtt3Client.builder().serverHost(mqttHost).serverPort(mqttPort)
.identifier(clientId + additionalSubscriptionIdTest)
// .automaticReconnect(MqttClientAutoReconnect.builder()
// .initialDelay(3000, TimeUnit.MILLISECONDS)
// .maxDelay(10000, TimeUnit.MILLISECONDS).build())
.buildBlocking();;
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(broker_username);
connOpts.setPassword(broker_password.toCharArray());
connOpts.setCleanSession(true);
String topic = "protobuf/event";

System.out.println("Connecting to broker: " + broker);
System.out.println("Connecting to broker: ssl://" + broker_host + ":" + broker_port);

sampleClient.connect(connOpts);
testClient.connect();

System.out.println("Publishing message: :::");
System.out.println("Publishing message: :::");

CustomEventOuter.CustomEvent proto = CustomEvent.newBuilder()
.setExternalIdType("c8y_Serial")
.setExternalId("berlin_01")
.setTxt("Dummy Text")
.setEventType("c8y_ProtobufEventType")
.setTimestamp(System.currentTimeMillis())
.build();
CustomEventOuter.CustomEvent proto = CustomEvent.newBuilder()
.setExternalIdType("c8y_Serial")
.setExternalId("berlin_01")
.setTxt("Dummy Text")
.setEventType("c8y_ProtobufEventType")
.setTimestamp(System.currentTimeMillis())
.build();

MqttMessage message = new MqttMessage(proto.toByteArray());
message.setQos(qos);
sampleClient.publish(topic2, message);
Mqtt3AsyncClient sampleClientAsync = testClient.toAsync();
sampleClientAsync.publishWith().topic(topic).qos(MqttQos.AT_LEAST_ONCE).payload(proto.toByteArray()).send();

System.out.println("Message published");
sampleClient.disconnect();
System.out.println("Disconnected");
//System.exit(0);

} catch (MqttException me) {
System.out.println("Exception:" + me.getMessage());
me.printStackTrace();
}
System.out.println("Message published");
testClient.disconnect();
System.out.println("Disconnected");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,115 +21,97 @@

package dynamic.mapping;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3Client;
import com.hivemq.client.mqtt.mqtt3.message.auth.Mqtt3SimpleAuth;

import dynamic.mapping.processor.extension.internal.InternalCustomAlarmOuter;
import dynamic.mapping.processor.extension.internal.InternalCustomAlarmOuter.InternalCustomAlarm;
import dynamic.mapping.processor.processor.fixed.StaticCustomMeasurementOuter;
import dynamic.mapping.processor.processor.fixed.StaticCustomMeasurementOuter.StaticCustomMeasurement;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ProtobufPahoClient {

static MemoryPersistence persistence = new MemoryPersistence();
Mqtt3BlockingClient testClient;
static String broker_host = System.getenv("broker");
static Integer broker_port = Integer.valueOf(System.getenv("broker_port"));
static String client_id = System.getenv("client_id");
static String broker_username = System.getenv("broker_username");
static String broker_password = System.getenv("broker_password");

public ProtobufPahoClient(Mqtt3BlockingClient sampleClient) {
testClient = sampleClient;
}

public static void main(String[] args) {

ProtobufPahoClient client = new ProtobufPahoClient();
Mqtt3SimpleAuth simpleAuth = Mqtt3SimpleAuth.builder().username(broker_username)
.password(broker_password.getBytes()).build();
Mqtt3BlockingClient sampleClient = Mqtt3Client.builder()
.serverHost(broker_host)
.serverPort(broker_port)
.identifier(client_id)
.simpleAuth(simpleAuth)
.sslWithDefaultConfig()
.buildBlocking();
ProtobufPahoClient client = new ProtobufPahoClient(sampleClient);
client.testSendMeasurement();
client.testSendAlarm();

}

private void testSendMeasurement() {
int qos = 0;
String broker = System.getenv("broker");
String client_id = System.getenv("client_id");
String broker_username = System.getenv("broker_username");
String broker_password = System.getenv("broker_password");
try {
String topic1 = "protobuf/measurement";
MqttClient sampleClient = new MqttClient(broker, client_id, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(broker_username);
connOpts.setPassword(broker_password.toCharArray());
connOpts.setCleanSession(true);

System.out.println("Connecting to broker: " + broker);

sampleClient.connect(connOpts);

System.out.println("Publishing message: :::");

StaticCustomMeasurementOuter.StaticCustomMeasurement proto = StaticCustomMeasurement.newBuilder()
.setExternalIdType("c8y_Serial")
.setExternalId("berlin_01")
.setUnit("C")
.setMeasurementType("c8y_GenericMeasurement")
.setValue(99.7F)
.build();

MqttMessage message = new MqttMessage(proto.toByteArray());
message.setQos(qos);
sampleClient.publish(topic1, message);

System.out.println("Message published");
sampleClient.disconnect();
sampleClient.close();
System.out.println("Disconnected");
// System.exit(0);

} catch (MqttException me) {
System.out.println("Exception:" + me.getMessage());
me.printStackTrace();
}

String topic = "protobuf/measurement";
System.out.println("Connecting to broker: ssl://" + broker_host + ":" + broker_port);
testClient.connect();

System.out.println("Publishing message: :::");

StaticCustomMeasurementOuter.StaticCustomMeasurement proto = StaticCustomMeasurement.newBuilder()
.setExternalIdType("c8y_Serial")
.setExternalId("berlin_01")
.setUnit("C")
.setMeasurementType("c8y_GenericMeasurement")
.setValue(99.7F)
.build();

Mqtt3AsyncClient sampleClientAsync = testClient.toAsync();
sampleClientAsync.publishWith().topic(topic).qos(MqttQos.AT_LEAST_ONCE).payload(proto.toByteArray()).send();

System.out.println("Message published");
testClient.disconnect();
System.out.println("Disconnected");

}

private void testSendAlarm() {
int qos = 0;
String broker = System.getenv("broker");
String client_id = System.getenv("client_id");
String broker_username = System.getenv("broker_username");
String broker_password = System.getenv("broker_password");
String topic2 = "protobuf/alarm";

try {
MqttClient sampleClient = new MqttClient(broker, client_id, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(broker_username);
connOpts.setPassword(broker_password.toCharArray());
connOpts.setCleanSession(true);

System.out.println("Connecting to broker: " + broker);

sampleClient.connect(connOpts);

System.out.println("Publishing message: :::");

InternalCustomAlarmOuter.InternalCustomAlarm proto = InternalCustomAlarm.newBuilder()
.setExternalIdType("c8y_Serial")
.setExternalId("berlin_01")
.setTxt("Dummy Text")
.setTimestamp(System.currentTimeMillis())
.setAlarmType("c8y_ProtobufAlarmType")
.build();

MqttMessage message = new MqttMessage(proto.toByteArray());
message.setQos(qos);
sampleClient.publish(topic2, message);

System.out.println("Message published");
sampleClient.disconnect();
sampleClient.close();
System.out.println("Disconnected");
// System.exit(0);

} catch (MqttException me) {
System.out.println("Exception:" + me.getMessage());
me.printStackTrace();
}

String topic = "protobuf/alarm";
System.out.println("Connecting to broker: ssl://" + broker_host + ":" + broker_port);
testClient.connect();

System.out.println("Publishing message: :::");

testClient.connect();

System.out.println("Publishing message: :::");

InternalCustomAlarmOuter.InternalCustomAlarm proto = InternalCustomAlarm.newBuilder()
.setExternalIdType("c8y_Serial")
.setExternalId("berlin_01")
.setTxt("Dummy Text")
.setTimestamp(System.currentTimeMillis())
.setAlarmType("c8y_ProtobufAlarmType")
.build();
Mqtt3AsyncClient sampleClientAsync = testClient.toAsync();
sampleClientAsync.publishWith().topic(topic).qos(MqttQos.AT_LEAST_ONCE).payload(proto.toByteArray()).send();

System.out.println("Message published");
testClient.disconnect();
System.out.println("Disconnected");

}

}

0 comments on commit f5dbde4

Please sign in to comment.