Skip to content

Commit

Permalink
Add support for MQTT retain flag
Browse files Browse the repository at this point in the history
add retain flag to constructor
add setRetain function
  • Loading branch information
rjwats authored Apr 1, 2022
1 parent c6a3756 commit baae203
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions lib/framework/MqttPubSub.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,20 @@ class MqttPub : virtual public MqttConnector<T> {
StatefulService<T>* statefulService,
AsyncMqttClient* mqttClient,
const String& pubTopic = "",
bool retain = false,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
MqttConnector<T>(statefulService, mqttClient, bufferSize), _stateReader(stateReader), _pubTopic(pubTopic) {
MqttConnector<T>(statefulService, mqttClient, bufferSize),
_stateReader(stateReader),
_pubTopic(pubTopic),
_retain(retain) {
MqttConnector<T>::_statefulService->addUpdateHandler([&](const String& originId) { publish(); }, false);
}

void setRetain(const bool retain) {
_retain = retain;
publish();
}

void setPubTopic(const String& pubTopic) {
_pubTopic = pubTopic;
publish();
Expand All @@ -51,6 +60,7 @@ class MqttPub : virtual public MqttConnector<T> {
private:
JsonStateReader<T> _stateReader;
String _pubTopic;
bool _retain;

void publish() {
if (_pubTopic.length() > 0 && MqttConnector<T>::_mqttClient->connected()) {
Expand All @@ -64,7 +74,7 @@ class MqttPub : virtual public MqttConnector<T> {
serializeJson(json, payload);

// publish the payload
MqttConnector<T>::_mqttClient->publish(_pubTopic.c_str(), 0, false, payload.c_str());
MqttConnector<T>::_mqttClient->publish(_pubTopic.c_str(), 0, _retain, payload.c_str());
}
}
};
Expand Down Expand Up @@ -145,9 +155,10 @@ class MqttPubSub : public MqttPub<T>, public MqttSub<T> {
AsyncMqttClient* mqttClient,
const String& pubTopic = "",
const String& subTopic = "",
bool retain = false,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
MqttConnector<T>(statefulService, mqttClient, bufferSize),
MqttPub<T>(stateReader, statefulService, mqttClient, pubTopic, bufferSize),
MqttPub<T>(stateReader, statefulService, mqttClient, pubTopic, retain, bufferSize),
MqttSub<T>(stateUpdater, statefulService, mqttClient, subTopic, bufferSize) {
}

Expand Down

0 comments on commit baae203

Please sign in to comment.