Skip to content

Commit

Permalink
Adding working subcribe using async client
Browse files Browse the repository at this point in the history
  • Loading branch information
ck-c8y authored and switschel committed Mar 15, 2024
1 parent 7eba6bb commit 1bf3119
Showing 1 changed file with 22 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ tenant, isConnected(),
+ mqttPort;
// mqttClient = new MqttClient(broker, MqttClient.generateClientId(), new
// MemoryPersistence());

Mqtt3SimpleAuthBuilder simpleAuthBuilder = Mqtt3SimpleAuth.builder();
Complete simpleAuthComplete = null;
if (!StringUtils.isEmpty(user)) {
Expand Down Expand Up @@ -397,7 +396,7 @@ public boolean isConnected() {

@Override
public void disconnect() {
log.info("Tenant {} - Diconnecting from MQTT broker: {}", tenant,
log.info("Tenant {} - Disconnecting from MQTT broker: {}", tenant,
(mqttClient == null ? null : mqttClient.getConfig().getServerHost()));
try {
if (isConnected()) {
Expand Down Expand Up @@ -440,39 +439,27 @@ public void subscribe(String topic, Integer qos) throws ConnectorException {
log.debug("Tenant {} - Subscribing on topic: {}", tenant, topic);
Mqtt3SubAck subAck = null;
sendSubscriptionEvents(topic, "Subscribing");
// if (qos != null) {
// Mqtt3BlockingClient.Mqtt3Publishes publishes =
// mqttClient.subscribeWith().addSubscription().;
// Mqtt3Subscribe subscribe = Mqtt3Subscribe.builder()
// .addSubscription(Mqtt3Subscription.builder()
// .
// .topicFilter("topic").qos(MqttQos.AT_LEAST_ONCE)
// .build())
// .build();

// Mqtt3Subscribe sub =
// Mqtt3Subscribe.builder().topicFilter(topic).qos(MqttQos.fromCode(qos)).build();
// subAck = mqttClient.subscribe(sub);
// .callback(publish -> {
// boolean success = false;

// // Some logic & conditions

// if (success) {
// publish.acknowledge(); // Conditionally acknowledge the message
// }
// })
// .send().join();
// ack = mqttClient.subscribe(topic, qos);

// }

// else
// ack = mqttClient.subscribe(topic);
// log.debug("Tenant {} - Successfully subscribed on topic: {}", tenant, topic);
// if (!subAck..equals(Mqtt3ConnAckReturnCode.SUCCESS)) {
// throw new ConnectorException(ack.getReturnCode().toString());
// }
if (qos != null) {
//We don't need to add a handler on subscribe using hive client
mqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send();
Mqtt3AsyncClient asyncMqttClient = mqttClient.toAsync();
asyncMqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send().thenRun(() -> {
log.debug("Tenant {} - Successfully subscribed on topic: {}", tenant, topic);
}).exceptionally(throwable -> {
log.error("Tenant {} - Failed to subscribe on topic {} with error: ",tenant,topic,throwable.getMessage());
return null;
});
} else {
//We don't need to add a handler on subscribe using hive client
mqttClient.subscribeWith().topicFilter(topic).send();
Mqtt3AsyncClient asyncMqttClient = mqttClient.toAsync();
asyncMqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send().thenRun(() -> {
log.debug("Tenant {} - Successfully subscribed on topic: {}", tenant, topic);
}).exceptionally(throwable -> {
log.error("Tenant {} - Failed to subscribe on topic {} with error: ",tenant,topic,throwable.getMessage());
return null;
});
}
}

public void unsubscribe(String topic) throws Exception {
Expand Down

0 comments on commit 1bf3119

Please sign in to comment.