Skip to content

Commit

Permalink
Fixed issue around swallowing mqtt publish errors.
Browse files Browse the repository at this point in the history
Signed-off-by: Suneet Nangia <[email protected]>
  • Loading branch information
suneetnangia committed Mar 11, 2024
1 parent a8d509a commit ca6b061
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion crates/outbound-mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,24 @@ impl v2::HostConnection for OutboundMqtt {
qos: Qos,
) -> Result<Result<(), Error>> {
Ok(async {
let (client, _) = self.get_conn(connection).await.map_err(other_error)?;
let (client, eventloop) = self.get_conn(connection).await.map_err(other_error)?;
let qos = convert_to_mqtt_qos_value(qos);

// Message published to EventLoop (not MQTT Broker)
client
.publish_bytes(topic, qos, false, payload.into())
.await
.map_err(other_error)?;

// Poll EventLoop once to send the message to MQTT broker or capture/throw error
// We may revisit this later to manage long running connections and their issues in the connection pool.
eventloop
.poll()
.await
.map_err(|err: rumqttc::ConnectionError| {
v2::Error::ConnectionFailed(err.to_string())
})?;

Ok(())
}
.await)
Expand Down

0 comments on commit ca6b061

Please sign in to comment.