From ca6b061637b58accccef5752996fe2652df16443 Mon Sep 17 00:00:00 2001 From: Suneet Nangia Date: Mon, 11 Mar 2024 15:04:09 +0000 Subject: [PATCH] Fixed issue around swallowing mqtt publish errors. Signed-off-by: Suneet Nangia --- crates/outbound-mqtt/src/lib.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/outbound-mqtt/src/lib.rs b/crates/outbound-mqtt/src/lib.rs index e0f0106809..ec3118ba89 100644 --- a/crates/outbound-mqtt/src/lib.rs +++ b/crates/outbound-mqtt/src/lib.rs @@ -88,12 +88,24 @@ impl v2::HostConnection for OutboundMqtt { qos: Qos, ) -> Result> { Ok(async { - let (client, _) = self.get_conn(connection).await.map_err(other_error)?; + let (client, eventloop) = self.get_conn(connection).await.map_err(other_error)?; let qos = convert_to_mqtt_qos_value(qos); + + // Message published to EventLoop (not MQTT Broker) client .publish_bytes(topic, qos, false, payload.into()) .await .map_err(other_error)?; + + // Poll EventLoop once to send the message to MQTT broker or capture/throw error + // We may revisit this later to manage long running connections and their issues in the connection pool. + eventloop + .poll() + .await + .map_err(|err: rumqttc::ConnectionError| { + v2::Error::ConnectionFailed(err.to_string()) + })?; + Ok(()) } .await)