Skip to content

Commit

Permalink
Add MQTT logging
Browse files Browse the repository at this point in the history
Forward messages to MQTT broker
  • Loading branch information
jbaudoux committed Mar 31, 2024
1 parent 7b851c5 commit 07f528e
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 94 deletions.
66 changes: 34 additions & 32 deletions BSB_LAN/BSB_LAN.ino
Original file line number Diff line number Diff line change
Expand Up @@ -2886,14 +2886,18 @@ void generateJSONwithConfig() {
* Pass parameters:
* byte *msg pointer to the telegram buffer
* Parameters passed back:
* none
* char * pvalstr
* Function value returned:
* none
* Global resources used:
* log_parameters
* *************************************************************** */
void LogTelegram(byte* msg) {
if (!(logTelegram & LOGTELEGRAM_ON) || !(LoggingMode & CF_LOGMODE_SD_CARD)) return;
void LogTelegram(byte* msg, float query_line) {
printTelegram(msg, query_line);
// printTelegram has populated decodedTelegram
if (!(logTelegram & LOGTELEGRAM_ON)) return;
//if !(LoggingMode & CF_LOGMODE_SD_CARD)) return;
if (!(LoggingMode & (CF_LOGMODE_SD_CARD | CF_LOGMODE_MQTT))) return;
File dataFile;
uint32_t cmd;
int i=0; // begin with line 0
Expand All @@ -2906,11 +2910,15 @@ void LogTelegram(byte* msg) {
int data_len;
float dval;
float line = 0;
if (LoggingMode & CF_LOGMODE_SD_CARD) {
#if !defined(ESP32)
if (SDCard.vol()->freeClusterCount() < minimum_SD_size) return;
if (SDCard.vol()->freeClusterCount() < minimum_SD_size) return;
#else
if (totalBytes()-usedBytes() < minimum_SD_size) return;
if (totalBytes()-usedBytes() < minimum_SD_size) return;
#endif
dataFile = SDCard.open(journalFileName, FILE_APPEND);
if (!dataFile) return;
}

if (bus->getBusType() != BUS_PPS) {
if (msg[4+(bus->getBusType()*4)]==TYPE_QUR || msg[4+(bus->getBusType()*4)]==TYPE_SET || (((msg[2]!=ADDR_ALL && bus->getBusType()==BUS_BSB) || (msg[2]<0xF0 && bus->getBusType()==BUS_LPB)) && msg[4+(bus->getBusType()*4)]==TYPE_INF)) { //QUERY and SET: byte 5 and 6 are in reversed order
Expand Down Expand Up @@ -2956,8 +2964,10 @@ void LogTelegram(byte* msg) {
default: logThis = false; break;
}
if (logThis) {
dataFile = SDCard.open(journalFileName, FILE_APPEND);
if (dataFile) {
if (LoggingMode & CF_LOGMODE_MQTT) {
mqtt_sendtoBroker();
}
if (LoggingMode & CF_LOGMODE_SD_CARD) {
int outBufLen = 0;
outBufLen += sprintf_P(outBuf, PSTR("%lu;%s;"), millis(), GetDateTime(outBuf + outBufLen + 80));
if (!known) { // no hex code match
Expand Down Expand Up @@ -3028,9 +3038,11 @@ void LogTelegram(byte* msg) {
}
strcat_P(outBuf + outBufLen, PSTR("\r\n"));
dataFile.print(outBuf);
dataFile.close();
}
}
if (LoggingMode & CF_LOGMODE_SD_CARD) {
dataFile.close();
}
}

#define MAX_PARAM_LEN 22
Expand Down Expand Up @@ -3619,16 +3631,14 @@ int set(float line // the ProgNr of the heater parameter

// Decode the xmit telegram and send it to the PC serial interface
if (verbose) {
printTelegram(tx_msg, line);
LogTelegram(tx_msg);
LogTelegram(tx_msg, line);
}

// no answer for TYPE_INF
if (t!=TYPE_SET) return 1;

// Decode the rcv telegram and send it to the PC serial interface
printTelegram(msg, line);
LogTelegram(msg);
LogTelegram(msg, line);
// Expect an acknowledgement to our SET telegram
if (msg[4+(bus->getBusType()*4)]!=TYPE_ACK) { // msg type at 4 (BSB) or 8 (LPB)
printlnToDebug(PSTR("set failed NACK"));
Expand Down Expand Up @@ -3672,13 +3682,11 @@ int queryDefaultValue(float line, byte *msg, byte *tx_msg) {
} else {
// Decode the xmit telegram and send it to the debug interface
if (verbose) {
printTelegram(tx_msg, line);
LogTelegram(tx_msg);
LogTelegram(tx_msg, line);
}

// Decode the rcv telegram and send it to the debug interface
printTelegram(msg, line); // send to debug interface
LogTelegram(msg);
LogTelegram(msg, line);
}
}
return 1;
Expand Down Expand Up @@ -4142,16 +4150,14 @@ void query(float line) { // line (ProgNr)
if (bus->Send(query_type, c, msg, tx_msg) == BUS_OK) {
// Decode the xmit telegram and send it to the PC serial interface
if (verbose) {
printTelegram(tx_msg, line);
LogTelegram(tx_msg);
LogTelegram(tx_msg, line);
}

// Decode the rcv telegram and send it to the PC serial interface
printTelegram(msg, line);
LogTelegram(msg, line);
printFmtToDebug(PSTR("#%g: "), line);
printlnToDebug(build_pvalstr(0));
SerialOutput->flush();
LogTelegram(msg);
break; // success, break out of while loop
} else {
printlnToDebug(printError(261)); //query failed
Expand Down Expand Up @@ -4721,7 +4727,7 @@ void loop() {
if (monitor) {
busmsg=bus->Monitor(msg);
if (busmsg==true) {
LogTelegram(msg);
LogTelegram(msg, -1);
}
}
if (!monitor || busmsg == true) {
Expand All @@ -4730,8 +4736,7 @@ void loop() {
if (bus->GetMessage(msg) || busmsg == true) { // message was syntactically correct
// Decode the rcv telegram and send it to the PC serial interface
if (verbose && bus->getBusType() != BUS_PPS && !monitor) { // verbose output for PPS comes later
printTelegram(msg, -1);
LogTelegram(msg);
LogTelegram(msg, -1);
}

// Is this a broadcast message?
Expand Down Expand Up @@ -5404,11 +5409,9 @@ void loop() {
} else {
if (msg[4+(bus->getBusType()*4)]!=TYPE_ERR) {
// Decode the xmit telegram and send it to the PC serial interface
printTelegram(tx_msg, -1);
LogTelegram(tx_msg);
LogTelegram(tx_msg, -1);
// Decode the rcv telegram and send it to the PC serial interface
printTelegram(msg, -1); // send to hardware serial interface
LogTelegram(msg);
LogTelegram(msg, -1);
if (decodedTelegram.msg_type != TYPE_ERR) { //pvalstr[0]<1 - unknown command
my_dev_fam = temp_dev_fam;
my_dev_var = temp_dev_var;
Expand Down Expand Up @@ -5550,12 +5553,10 @@ void loop() {
printlnToDebug(PSTR("bus send failed")); // to PC hardware serial I/F
} else {
// Decode the xmit telegram and send it to the PC serial interface
printTelegram(tx_msg, -1);
LogTelegram(tx_msg);
LogTelegram(tx_msg, -1);
}
// Decode the rcv telegram and send it to the PC serial interface
printTelegram(msg, -1); // send to hardware serial interface
LogTelegram(msg);
LogTelegram(msg, -1);
// TODO: replace pvalstr with data from decodedTelegram structure
build_pvalstr(1);
if (outBuf[0]>0) {
Expand Down Expand Up @@ -6693,7 +6694,8 @@ next_parameter:
my_dev_var = save_my_dev_var;
}
}
mqtt_sendtoBroker(log_parameters[i]); //Luposoft, put whole unchanged code in new function mqtt_sendtoBroker to use it at other points as well
query(log_parameters[i].number);
mqtt_sendtoBroker(); //Luposoft, put whole unchanged code in new function mqtt_sendtoBroker to use it at other points as well
}
}
if (destAddr != d_addr) {
Expand Down
3 changes: 1 addition & 2 deletions BSB_LAN/include/broadcast_msg_handling.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ void broadcast_msg_handling(byte *msg){
if (((msg[2]==ADDR_ALL && bus->getBusType()==BUS_BSB) || (msg[2]>=0xF0 && bus->getBusType()==BUS_LPB)) && msg[4+(bus->getBusType()*4)]==TYPE_INF) { // handle broadcast messages
// Decode the rcv telegram and send it to the PC serial interface
if (!verbose && !monitor) { // don't log twice if in verbose mode, but log broadcast messages also in non-verbose mode
printTelegram(msg, -1);
LogTelegram(msg);
LogTelegram(msg, -1);
}

// Filter Brenner Status messages
Expand Down
95 changes: 48 additions & 47 deletions BSB_LAN/include/mqtt_handler.h
Original file line number Diff line number Diff line change
@@ -1,24 +1,9 @@
char *build_pvalstr(bool extended);
unsigned long mqtt_reconnect_timer;

//Luposoft: function mqtt_sendtoBroker
/* Function: mqtt_sendtoBroker()
* Does: send messages to mqtt-broker
* Pass parameters:
* int param
* Parameters passed back:
* none
* Function value returned:
* none
* Global resources used:
* Serial instance
* Ethernet instance
* MQTT instance
* *************************************************************** */

/* Function: mqtt_get_client_id()
* Does: Gets the client ID to use for the MQTT connection based on the set
* MQTT Device ID, if unset, defaults to "BSB-LAN".
* MQTT Device ID, if unset, defaults to "BSB-LAN".
* Pass parameters:
* none
* Function value returned
Expand All @@ -38,7 +23,21 @@ const String mqtt_get_client_id() {
return result;
}

void mqtt_sendtoBroker(parameter param) {
/* Function: mqtt_sendtoBroker()
* Does: Send messages to mqtt-broker
* Pass parameters:
* none
* Parameters passed back:
* none
* Function value returned:
* none
* Global resources used:
* Serial instance
* Ethernet instance
* MQTT instance
*/

void mqtt_sendtoBroker() {
// Declare local variables and start building json if enabled
String MQTTPayload = "";
String MQTTTopic = "";
Expand All @@ -50,7 +49,8 @@ void mqtt_sendtoBroker(parameter param) {
MQTTTopic = "BSB-LAN/";
}

query(param.number);
float prognr = decodedTelegram.prognr;
String param_number = String(prognr, (roundf(prognr * 10) != roundf(prognr) * 10)?1:0);

switch(mqtt_mode)
{
Expand All @@ -59,11 +59,12 @@ void mqtt_sendtoBroker(parameter param) {
// =============================================
case 1:
// use parameter code as sub-topic
MQTTTopic.concat(String(param.number, (roundf(param.number * 10) != roundf(param.number) * 10)?1:0));
if (param.dest_addr > -1) {
MQTTTopic.concat("!");
MQTTTopic.concat(String(param.dest_addr));
}
MQTTTopic.concat(param_number);
// FIXME
// if (param.dest_addr > -1) {
// MQTTTopic.concat("!");
// MQTTTopic.concat(String(param.dest_addr));
// }
if (decodedTelegram.type == VT_ENUM || decodedTelegram.type == VT_BINARY_ENUM || decodedTelegram.type == VT_ONOFF || decodedTelegram.type == VT_YESNO || decodedTelegram.type == VT_BIT || decodedTelegram.type == VT_ERRORCODE || decodedTelegram.type == VT_DATETIME || decodedTelegram.type == VT_DAYMONTH || decodedTelegram.type == VT_TIME || decodedTelegram.type == VT_WEEKDAY) {
//---- we really need build_pvalstr(0) or we need decodedTelegram.value or decodedTelegram.enumdescaddr ? ----
//---- yes, because build_pvalstr(0) sends both the value and the description. If only one is needed (I don't know about MQTT users) then we can use one of the other (FH 2.1.2021)
Expand All @@ -82,11 +83,12 @@ void mqtt_sendtoBroker(parameter param) {
MQTTPayload.concat(F("{\""));
MQTTPayload.concat(mqtt_get_client_id());
MQTTPayload.concat(F("\":{\"status\":{\""));
MQTTPayload.concat(String(param.number, (roundf(param.number * 10) != roundf(param.number) * 10)?1:0));
if (param.dest_addr > -1) {
MQTTPayload.concat("!");
MQTTPayload.concat(String(param.dest_addr));
}
MQTTPayload.concat(param_number);
// FIXME
// if (param.dest_addr > -1) {
// MQTTPayload.concat("!");
// MQTTPayload.concat(String(param.dest_addr));
// }
MQTTPayload.concat(F("\":\""));
if (decodedTelegram.type == VT_ENUM || decodedTelegram.type == VT_BINARY_ENUM || decodedTelegram.type == VT_ONOFF || decodedTelegram.type == VT_YESNO || decodedTelegram.type == VT_BIT || decodedTelegram.type == VT_ERRORCODE || decodedTelegram.type == VT_DATETIME || decodedTelegram.type == VT_DAYMONTH || decodedTelegram.type == VT_TIME || decodedTelegram.type == VT_WEEKDAY) {
//---- we really need build_pvalstr(0) or we need decodedTelegram.value or decodedTelegram.enumdescaddr ? ----
Expand All @@ -110,11 +112,12 @@ void mqtt_sendtoBroker(parameter param) {
MQTTPayload.concat(F("BSB-LAN"));
}
MQTTPayload.concat(F("\":{\"id\":"));
MQTTPayload.concat(String(param.number, (roundf(param.number * 10) != roundf(param.number) * 10)?1:0));
if (param.dest_addr > -1) {
MQTTPayload.concat("!");
MQTTPayload.concat(String(param.dest_addr));
}
MQTTPayload.concat(param_number);
// FIXME
// if (param.dest_addr > -1) {
// MQTTPayload.concat("!");
// MQTTPayload.concat(String(param.dest_addr));
// }
MQTTPayload.concat(F(",\"name\":\""));
MQTTPayload.concat(decodedTelegram.prognrdescaddr);
MQTTPayload.concat(F("\",\"value\": \""));
Expand Down Expand Up @@ -143,7 +146,7 @@ void mqtt_sendtoBroker(parameter param) {
}

/* Function: mqtt_get_will_topic()
* Does: Constructs the MQTT Will Topic used throught the system
* Does: Constructs the MQTT Will Topic used throught the system
* Pass parameters:
* none
* Function value returned
Expand All @@ -164,10 +167,8 @@ const String mqtt_get_will_topic() {
return MQTTLWTopic;
}

//Luposoft: Funktionen mqtt_connect
/* Function: mqtt_connect()
* Does: connect to mqtt broker
/* Function: mqtt_connect()
* Does: Connect to MQTT broker
* Pass parameters:
* none
* Parameters passed back:
Expand All @@ -178,7 +179,7 @@ const String mqtt_get_will_topic() {
* Serial instance
* Ethernet instance
* MQTT instance
* *************************************************************** */
*/

bool mqtt_connect() {
char* tempstr = (char*)malloc(sizeof(mqtt_broker_addr)); // make a copy of mqtt_broker_addr for destructive strtok operation
Expand Down Expand Up @@ -258,8 +259,8 @@ bool mqtt_connect() {
}

/* Function: mqtt_disconnect()
* Does: Will disconnect from the MQTT Broker if connected.
* Frees accociated resources
* Does: Will disconnect from the MQTT broker if connected.
* Frees associated resources
* Pass parameters:
* none
* Parameters passed back:
Expand Down Expand Up @@ -290,11 +291,10 @@ void mqtt_disconnect() {
}
}

//Luposoft: Funktionen mqtt_callback
/* Function: mqtt_callback()
* Does: will call by MQTTPubSubClient.loop() when incomming mqtt-message from broker
* Example: set <mqtt2Server> publish <MQTTTopicPrefix> S700=1
send command to heater and return an acknowledge to broker
* Does: Will call by MQTTPubSubClient.loop() when incomming mqtt-message from broker
* Example: set <mqtt2Server> publish <MQTTTopicPrefix> S700=1
send command to heater and return an acknowledge to broker
* Pass parameters:
* topic,payload,length
* Parameters passed back:
Expand All @@ -304,7 +304,7 @@ void mqtt_disconnect() {
* Global resources used:
* Serial instance
* Ethernet instance
* *************************************************************** */
*/

void mqtt_callback(char* topic, byte* payload, unsigned int length) {
uint8_t destAddr = bus->getBusDest();
Expand Down Expand Up @@ -361,7 +361,8 @@ void mqtt_callback(char* topic, byte* payload, unsigned int length) {
printFmtToDebug(PSTR("%.1f=%s \r\n"), param.number, C_payload);
set(param.number,C_payload,firstsign=='S'); //command to heater
}
mqtt_sendtoBroker(param); //send mqtt-message
query(param.number);
mqtt_sendtoBroker(); //send mqtt-message
printlnToDebug(PSTR("##MQTT#############################"));
if (param.dest_addr > -1 && destAddr != param.dest_addr) {
bus->setBusType(bus->getBusType(), bus->getBusAddr(), destAddr);
Expand Down
Loading

0 comments on commit 07f528e

Please sign in to comment.