Skip to content

Commit

Permalink
[SERIAL] Improve communication robustness
Browse files Browse the repository at this point in the history
-Add a semaphore to avoid msg overlap
-Clean partial buffer
-Enable erase and restart of the secondary module from the primary
  • Loading branch information
1technophile committed Oct 3, 2024
1 parent fcc09df commit 7186808
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 45 deletions.
2 changes: 1 addition & 1 deletion main/ZgatewayBT.ino
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ void setupBTTasksAndBLE() {
procBLETask, /* Function to implement the task */
"procBLETask", /* Name of the task */
# if defined(USE_ESP_IDF) || defined(USE_BLUFI)
12500,
13500,
# else
8500, /* Stack size in bytes */
# endif
Expand Down
106 changes: 69 additions & 37 deletions main/ZgatewaySERIAL.ino
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,23 @@
SoftwareSerial SERIALSoftSerial(SERIAL_RX_GPIO, SERIAL_TX_GPIO); // RX, TX
# endif

# ifdef ESP32
SemaphoreHandle_t serialSemaphore = NULL;
const TickType_t semaphoreTimeout = pdMS_TO_TICKS(2000); // 1 second timeout
# endif

// use pointer to stream class for serial communication to make code
// compatible with both softwareSerial as hardwareSerial.
Stream* SERIALStream = NULL;
//unsigned long msgCount = 0;
unsigned long msgCount = 0;

bool receiverReady = false;
unsigned long lastHeartbeatReceived = 0;
unsigned long lastHeartbeatAckReceived = 0;
unsigned long lastHeartbeatSent = 0;
unsigned long lastHeartbeatAckReceivedCheck = 0;
const unsigned long heartbeatTimeout = 15000; // 15 seconds timeout for ack
const unsigned long heartbeatAckCheckInterval = 5000; // Check for ack every 5 seconds
const unsigned long maxHeartbeatInterval = 60000; // Maximum interval of 1 minute
unsigned long heartbeatInterval = 5000; // 5 seconds
bool isOverflow = false;
Expand Down Expand Up @@ -102,6 +109,13 @@ void setupSERIAL() {
Log.notice(F("SERIAL_TX_GPIO: %d" CR), SERIAL_TX_GPIO);
# endif

# ifdef ESP32
serialSemaphore = xSemaphoreCreateMutex();
if (serialSemaphore == NULL) {
Log.error(F("Failed to create serialSemaphore" CR));
}
# endif

// Flush all bytes in the "link" serial port buffer
while (SERIALStream->available() > 0)
SERIALStream->read();
Expand Down Expand Up @@ -137,42 +151,59 @@ void SERIALtoX() {

# elif SERIALtoMQTTmode == 1 // Convert received JSON data to one or multiple MQTT topics
void sendHeartbeat() {
Log.trace(F("Sending Serial heartbeat" CR));
SERIALStream->print(SERIALPre);
SERIALStream->print("{\"type\":\"heartbeat\"}");
SERIALStream->print(SERIALPost);
SERIALStream->flush();
if (xSemaphoreTake(serialSemaphore, semaphoreTimeout) == pdTRUE) {
SERIALStream->print(SERIALPre);
SERIALStream->print("{\"type\":\"heartbeat\"}");
SERIALStream->print(SERIALPost);
SERIALStream->flush();
Log.notice(F("Sent Serial heartbeat" CR));
xSemaphoreGive(serialSemaphore);
} else {
Log.error(F("Failed to take serialSemaphore" CR));
}
}

void sendHeartbeatAck() {
SERIALStream->print(SERIALPre);
SERIALStream->print("{\"type\":\"heartbeat_ack\"}");
SERIALStream->print(SERIALPost);
SERIALStream->flush();
Log.trace(F("Sent heartbeat ack" CR));
if (xSemaphoreTake(serialSemaphore, semaphoreTimeout) == pdTRUE) {
SERIALStream->print(SERIALPre);
SERIALStream->print("{\"type\":\"heartbeat_ack\"}");
SERIALStream->print(SERIALPost);
SERIALStream->flush();
Log.notice(F("Sent heartbeat ack" CR));
xSemaphoreGive(serialSemaphore);
} else {
Log.error(F("Failed to take serialSemaphore" CR));
}
}

void SERIALtoX() {
static String buffer = ""; // Static buffer to store incomplete messages

unsigned long currentTime = millis();

# ifdef SENDER_SERIAL_HEARTBEAT
// Check if it's time to send a heartbeat and we're not in overflow
if (!isOverflow && currentTime - lastHeartbeatSent > heartbeatInterval) {
sendHeartbeat();
lastHeartbeatSent = currentTime;
}
if (currentTime - lastHeartbeatAckReceivedCheck > heartbeatAckCheckInterval) {
lastHeartbeatAckReceivedCheck = currentTime;
// Check if we received an ack for the last heartbeat
if (currentTime - lastHeartbeatAckReceived > heartbeatTimeout) {
// No ack received, increase the interval (with a maximum limit)
heartbeatInterval = min(heartbeatInterval * 2, maxHeartbeatInterval);
Log.warning(F("No heartbeat ack received. Increasing interval to %lu ms" CR), heartbeatInterval);
receiverReady = false;
} else {
// Ack received, reset the interval
heartbeatInterval = 5000;
}
sendHeartbeat();
lastHeartbeatSent = currentTime;
}

# else
receiverReady = true;
# endif
while (SERIALStream->available()) {
unsigned long now = millis();
char c = SERIALStream->read();
buffer += c;

Expand All @@ -194,7 +225,8 @@ void SERIALtoX() {
if (SERIALdata.containsKey("type") && strcmp(SERIALdata["type"], "heartbeat") == 0) {
handleHeartbeat();
} else if (SERIALdata.containsKey("type") && strcmp(SERIALdata["type"], "heartbeat_ack") == 0) {
lastHeartbeatAckReceived = currentTime;
lastHeartbeatAckReceived = now;
receiverReady = true;
Log.notice(F("Heartbeat ack received" CR));
} else {
// Process normal messages
Expand Down Expand Up @@ -231,6 +263,10 @@ void SERIALtoX() {

// Clear the buffer for the next message
buffer = "";
} else if (buffer.endsWith(SERIALPost)) {
// If the buffer ends with the postfix but does not start with the prefix, clear it
Log.error(F("Buffer error, clearing buffer. Partial content: %s" CR), buffer.c_str());
buffer = "";
} else if (buffer.length() > JSON_MSG_BUFFER) {
// If the buffer gets too large without finding a complete message, clear it
Log.error(F("Buffer overflow, clearing buffer. Partial content: %s" CR), buffer.c_str());
Expand Down Expand Up @@ -280,23 +316,16 @@ void sendMQTTfromNestedJson(JsonVariant obj, char* topic, int level, int maxLeve

bool XtoSERIAL(const char* topicOri, JsonObject& SERIALdata) {
bool res = false;
unsigned long currentTime = millis();

// Check if receiver is still ready (heartbeat check)
if (currentTime - lastHeartbeatReceived > heartbeatTimeout) {
receiverReady = false;
Log.error(F("Heartbeat timeout. Receiver is not ready." CR));
}

if (receiverReady && (cmpToMainTopic(topicOri, subjectMQTTtoSERIAL) || (SYSConfig.serial && SERIALdata.containsKey("origin") && SERIALdata["origin"].is<const char*>()) || (SYSConfig.serial && SERIALdata.containsKey("topic") && SERIALdata["topic"].is<const char*>()))) {
if (receiverReady && (cmpToMainTopic(topicOri, subjectMQTTtoSERIAL) ||
(SYSConfig.serial && SERIALdata.containsKey("origin") && SERIALdata["origin"].is<const char*>()) ||
(SYSConfig.serial && SERIALdata.containsKey("topic") && SERIALdata["topic"].is<const char*>()))) {
Log.trace(F("XtoSERIAL" CR));

// Prepare the data string
std::string data;
if (SYSConfig.serial ||
(SERIALdata.containsKey("origin") && SERIALdata["origin"].is<const char*>()) || // Module like BT to SERIAL
(SERIALdata.containsKey("target") && SERIALdata["target"].is<const char*>())) { // Command to send to a specific target example MQTTtoBT through SERIAL
//SERIALdata["msgcount"] = msgCount++;
SERIALdata["msgcount"] = msgCount++;
serializeJson(SERIALdata, data);
} else if (SERIALdata.containsKey("value")) {
data = SERIALdata["value"].as<std::string>();
Expand All @@ -305,15 +334,19 @@ bool XtoSERIAL(const char* topicOri, JsonObject& SERIALdata) {
// Send the message
const char* prefix = SERIALdata["prefix"] | SERIALPre;
const char* postfix = SERIALdata["postfix"] | SERIALPost;

SERIALStream->print(prefix);
SERIALStream->print(data.c_str());
SERIALStream->print(postfix);
SERIALStream->flush();

Log.notice(F("[ OMG->SERIAL ] data sent: %s" CR), data.c_str());
res = true;
delay(100);
if (xSemaphoreTake(serialSemaphore, semaphoreTimeout) == pdTRUE) {
SERIALStream->print(prefix);
SERIALStream->print(data.c_str());
SERIALStream->print(postfix);
SERIALStream->flush();

Log.notice(F("[ OMG->SERIAL ] data sent: %s" CR), data.c_str());
res = true;
delay(1000);
xSemaphoreGive(serialSemaphore);
} else {
Log.error(F("Failed to take serialSemaphore" CR));
}
}
return res;
}
Expand All @@ -324,7 +357,6 @@ bool isSerialReady() {

// This function should be called when a heartbeat is received from the receiver
void handleHeartbeat() {
receiverReady = true;
lastHeartbeatReceived = millis();
Log.trace(F("Heartbeat received. Receiver is ready." CR));
sendHeartbeatAck();
Expand Down
21 changes: 14 additions & 7 deletions main/main.ino
Original file line number Diff line number Diff line change
Expand Up @@ -1766,6 +1766,13 @@ void setupTLS(int index) {
9 - SELFTEST end
*/
void ESPRestart(byte reason) {
#ifdef SecondaryModule
// Erase the secondary module config
String restartCmdStr = "{\"cmd\":\"" + String(restartCmd) + "\"}";
Log.notice(F("Restarting secondary module : %s" CR), restartCmdStr.c_str());
receivingDATA(subjectMQTTtoSYSsetSecondaryModule, restartCmdStr.c_str());
delay(2000);
#endif
StaticJsonDocument<128> jsonBuffer;
JsonObject jsondata = jsonBuffer.to<JsonObject>();
jsondata["reason"] = reason;
Expand Down Expand Up @@ -1873,13 +1880,6 @@ void blockingWaitForReset() {
}
# endif
ledManager.setMode(-1, -1, LEDManager::STATIC, LED_WAITING_ONBOARD_COLOR, -1);
# ifdef SecondaryModule
// Erase the secondary module config
String eraseCmdStr = "{\"cmd\":\"" + String(eraseCmd) + "\"}";
Log.notice(F("Erasing secondary module config: %s" CR), eraseCmdStr.c_str());
receivingDATA(subjectMQTTtoSYSsetSecondaryModule, eraseCmdStr.c_str());
delay(5000);
# endif
// Checking if the flash has already been erased to identify if we erase it or go into failsafe mode
// going to failsafe mode is done by doing a long button press from a state where the flash has already been erased
if (SPIFFS.begin()) {
Expand Down Expand Up @@ -2737,6 +2737,13 @@ float intTemperatureRead() {
Erase flash and restart the ESP
*/
void erase(bool restart) {
#ifdef SecondaryModule
// Erase the secondary module config
String eraseCmdStr = "{\"cmd\":\"" + String(eraseCmd) + "\"}";
Log.notice(F("Erasing secondary module config: %s" CR), eraseCmdStr.c_str());
receivingDATA(subjectMQTTtoSYSsetSecondaryModule, eraseCmdStr.c_str());
delay(2000);
#endif
Log.trace(F("Formatting requested, result: %d" CR), SPIFFS.format());

#if defined(ESP8266)
Expand Down

0 comments on commit 7186808

Please sign in to comment.