From 1bf3119bd5212c41d63452fa528ecc0fa20cc6f8 Mon Sep 17 00:00:00 2001 From: sagIoTPower Date: Tue, 5 Mar 2024 15:46:24 +0100 Subject: [PATCH] Adding working subcribe using async client --- .../mapping/connector/mqtt/MQTTClient.java | 57 +++++++------------ 1 file changed, 22 insertions(+), 35 deletions(-) diff --git a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java index 402e7ca7..be14f1f4 100644 --- a/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java +++ b/dynamic-mapping-service/src/main/java/dynamic/mapping/connector/mqtt/MQTTClient.java @@ -246,7 +246,6 @@ tenant, isConnected(), + mqttPort; // mqttClient = new MqttClient(broker, MqttClient.generateClientId(), new // MemoryPersistence()); - Mqtt3SimpleAuthBuilder simpleAuthBuilder = Mqtt3SimpleAuth.builder(); Complete simpleAuthComplete = null; if (!StringUtils.isEmpty(user)) { @@ -397,7 +396,7 @@ public boolean isConnected() { @Override public void disconnect() { - log.info("Tenant {} - Diconnecting from MQTT broker: {}", tenant, + log.info("Tenant {} - Disconnecting from MQTT broker: {}", tenant, (mqttClient == null ? null : mqttClient.getConfig().getServerHost())); try { if (isConnected()) { @@ -440,39 +439,27 @@ public void subscribe(String topic, Integer qos) throws ConnectorException { log.debug("Tenant {} - Subscribing on topic: {}", tenant, topic); Mqtt3SubAck subAck = null; sendSubscriptionEvents(topic, "Subscribing"); - // if (qos != null) { - // Mqtt3BlockingClient.Mqtt3Publishes publishes = - // mqttClient.subscribeWith().addSubscription().; - // Mqtt3Subscribe subscribe = Mqtt3Subscribe.builder() - // .addSubscription(Mqtt3Subscription.builder() - // . - // .topicFilter("topic").qos(MqttQos.AT_LEAST_ONCE) - // .build()) - // .build(); - - // Mqtt3Subscribe sub = - // Mqtt3Subscribe.builder().topicFilter(topic).qos(MqttQos.fromCode(qos)).build(); - // subAck = mqttClient.subscribe(sub); - // .callback(publish -> { - // boolean success = false; - - // // Some logic & conditions - - // if (success) { - // publish.acknowledge(); // Conditionally acknowledge the message - // } - // }) - // .send().join(); - // ack = mqttClient.subscribe(topic, qos); - - // } - - // else - // ack = mqttClient.subscribe(topic); - // log.debug("Tenant {} - Successfully subscribed on topic: {}", tenant, topic); - // if (!subAck..equals(Mqtt3ConnAckReturnCode.SUCCESS)) { - // throw new ConnectorException(ack.getReturnCode().toString()); - // } + if (qos != null) { + //We don't need to add a handler on subscribe using hive client + mqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send(); + Mqtt3AsyncClient asyncMqttClient = mqttClient.toAsync(); + asyncMqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send().thenRun(() -> { + log.debug("Tenant {} - Successfully subscribed on topic: {}", tenant, topic); + }).exceptionally(throwable -> { + log.error("Tenant {} - Failed to subscribe on topic {} with error: ",tenant,topic,throwable.getMessage()); + return null; + }); + } else { + //We don't need to add a handler on subscribe using hive client + mqttClient.subscribeWith().topicFilter(topic).send(); + Mqtt3AsyncClient asyncMqttClient = mqttClient.toAsync(); + asyncMqttClient.subscribeWith().topicFilter(topic).qos(MqttQos.fromCode(qos)).send().thenRun(() -> { + log.debug("Tenant {} - Successfully subscribed on topic: {}", tenant, topic); + }).exceptionally(throwable -> { + log.error("Tenant {} - Failed to subscribe on topic {} with error: ",tenant,topic,throwable.getMessage()); + return null; + }); + } } public void unsubscribe(String topic) throws Exception {