Skip to content

Commit

Permalink
Merge pull request #10 from MathewHDYT/master
Browse files Browse the repository at this point in the history
Implement 2 buffers for receiving and sending
  • Loading branch information
imbeacon authored Oct 15, 2024
2 parents 97cc3a5 + ea26c76 commit 4039196
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 77 deletions.
2 changes: 1 addition & 1 deletion library.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"type": "git",
"url": "https://github.com/thingsboard/pubsubclient.git"
},
"version": "2.9.4",
"version": "2.10.0",
"exclude": "tests",
"examples": "examples/*/*.ino",
"frameworks": "arduino",
Expand Down
2 changes: 1 addition & 1 deletion library.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name=TBPubSubClient
version=2.9.4
version=2.10.0
author=ThingsBoard <[email protected]>
maintainer=ThingsBoard Team
sentence=A client library for MQTT messaging.
Expand Down
156 changes: 82 additions & 74 deletions src/PubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN
}

PubSubClient::~PubSubClient() {
free(this->buffer);
free(this->receive_buffer);
free(this->send_buffer);
}

boolean PubSubClient::connect(const char *id) {
Expand Down Expand Up @@ -195,9 +196,9 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass

if (result == 1) {
nextMsgId = 1;
// Leave room in the buffer for header and variable length field
// Leave room in the receive_buffer for header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
unsigned int j;
size_t j;

#if MQTT_VERSION == MQTT_VERSION_3_1
uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION};
Expand All @@ -207,7 +208,7 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
#define MQTT_HEADER_VERSION_LENGTH 7
#endif
for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) {
this->buffer[length++] = d[j];
this->receive_buffer[length++] = d[j];
}

uint8_t v;
Expand All @@ -227,30 +228,30 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
v = v|(0x80>>1);
}
}
this->buffer[length++] = v;
this->receive_buffer[length++] = v;

this->buffer[length++] = ((this->keepAlive) >> 8);
this->buffer[length++] = ((this->keepAlive) & 0xFF);
this->receive_buffer[length++] = ((this->keepAlive) >> 8);
this->receive_buffer[length++] = ((this->keepAlive) & 0xFF);

CHECK_STRING_LENGTH(length,id)
length = writeString(id,this->buffer,length);
length = writeString(id,this->receive_buffer,length);
if (willTopic) {
CHECK_STRING_LENGTH(length,willTopic)
length = writeString(willTopic,this->buffer,length);
length = writeString(willTopic,this->receive_buffer,length);
CHECK_STRING_LENGTH(length,willMessage)
length = writeString(willMessage,this->buffer,length);
length = writeString(willMessage,this->receive_buffer,length);
}

if(user != NULL) {
CHECK_STRING_LENGTH(length,user)
length = writeString(user,this->buffer,length);
length = writeString(user,this->receive_buffer,length);
if(pass != NULL) {
CHECK_STRING_LENGTH(length,pass)
length = writeString(pass,this->buffer,length);
length = writeString(pass,this->send_buffer,length);
}
}

write(MQTTCONNECT,this->buffer,length-MQTT_MAX_HEADER_SIZE);
write(MQTTCONNECT,this->receive_buffer,length-MQTT_MAX_HEADER_SIZE);

lastInActivity = lastOutActivity = millis();

Expand All @@ -266,13 +267,13 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass
uint32_t len = readPacket(&llen);

if (len == 4) {
if (buffer[3] == 0) {
if (receive_buffer[3] == 0) {
lastInActivity = millis();
pingOutstanding = false;
_state = MQTT_CONNECTED;
return true;
} else {
_state = buffer[3];
_state = receive_buffer[3];
}
}
_client->stop();
Expand Down Expand Up @@ -311,8 +312,8 @@ boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){

uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
uint16_t len = 0;
if(!readByte(this->buffer, &len)) return 0;
bool isPublish = (this->buffer[0]&0xF0) == MQTTPUBLISH;
if(!readByte(this->receive_buffer, &len)) return 0;
bool isPublish = (this->receive_buffer[0]&0xF0) == MQTTPUBLISH;
uint32_t multiplier = 1;
uint32_t length = 0;
uint8_t digit = 0;
Expand All @@ -327,19 +328,19 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
return 0;
}
if(!readByte(&digit)) return 0;
this->buffer[len++] = digit;
this->receive_buffer[len++] = digit;
length += (digit & 127) * multiplier;
multiplier <<=7; //multiplier *= 128
} while ((digit & 128) != 0);
*lengthLength = len-1;

if (isPublish) {
// Read in topic length to calculate bytes to skip over for Stream writing
if(!readByte(this->buffer, &len)) return 0;
if(!readByte(this->buffer, &len)) return 0;
skip = (this->buffer[*lengthLength+1]<<8)+this->buffer[*lengthLength+2];
if(!readByte(this->receive_buffer, &len)) return 0;
if(!readByte(this->receive_buffer, &len)) return 0;
skip = (this->receive_buffer[*lengthLength+1]<<8)+this->receive_buffer[*lengthLength+2];
start = 2;
if (this->buffer[0]&MQTTQOS1) {
if (this->receive_buffer[0]&MQTTQOS1) {
// skip message id
skip += 2;
}
Expand All @@ -355,7 +356,7 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) {
}

if (len < this->bufferSize) {
this->buffer[len] = digit;
this->receive_buffer[len] = digit;
len++;
}
idx++;
Expand All @@ -381,41 +382,41 @@ boolean PubSubClient::loop_read() {
}
unsigned long t = millis();
lastInActivity = t;
uint8_t type = buffer[0]&0xF0;
uint8_t type = receive_buffer[0]&0xF0;

switch(type) {
case MQTTPUBLISH:
{
if (callback) {
const boolean msgId_present = (buffer[0]&0x06) == MQTTQOS1;
const boolean msgId_present = (receive_buffer[0]&0x06) == MQTTQOS1;
const uint16_t tl_offset = llen+1;
const uint16_t tl = (buffer[tl_offset]<<8)+buffer[tl_offset+1]; /* topic length in bytes */
const uint16_t tl = (receive_buffer[tl_offset]<<8)+receive_buffer[tl_offset+1]; /* topic length in bytes */
const uint16_t topic_offset = tl_offset+2;
const uint16_t msgId_offset = topic_offset+tl;
const uint16_t payload_offset = msgId_present ? msgId_offset+2 : msgId_offset;
if ((payload_offset) >= this->bufferSize) {return false;}
if (len < payload_offset) {return false;}
memmove(buffer+topic_offset-1,buffer+topic_offset,tl); /* move topic inside buffer 1 byte to front */
buffer[topic_offset-1+tl] = 0; /* end the topic as a 'C' string with \x00 */
char *topic = (char*) buffer+topic_offset-1;
memmove(receive_buffer+topic_offset-1,receive_buffer+topic_offset,tl); /* move topic inside receive_buffer 1 byte to front */
receive_buffer[topic_offset-1+tl] = 0; /* end the topic as a 'C' string with \x00 */
char *topic = (char*) receive_buffer+topic_offset-1;
uint8_t *payload;
// msgId only present for QOS>0
if (msgId_present) {
const uint16_t msgId = (buffer[msgId_offset]<<8)+buffer[msgId_offset+1];
payload = buffer+payload_offset;
const uint16_t msgId = (receive_buffer[msgId_offset]<<8)+receive_buffer[msgId_offset+1];
payload = receive_buffer+payload_offset;
callback(topic,payload,len-payload_offset);
if (_client->connected()) {
buffer[0] = MQTTPUBACK;
buffer[1] = 2;
buffer[2] = (msgId >> 8);
buffer[3] = (msgId & 0xFF);
if (_client->write(buffer,4) != 0) {
receive_buffer[0] = MQTTPUBACK;
receive_buffer[1] = 2;
receive_buffer[2] = (msgId >> 8);
receive_buffer[3] = (msgId & 0xFF);
if (_client->write(receive_buffer,4) != 0) {
lastOutActivity = t;
}
}
} else {
// No msgId
payload = buffer+payload_offset;
payload = receive_buffer+payload_offset;
callback(topic,payload,len-payload_offset);
}
}
Expand All @@ -424,9 +425,9 @@ boolean PubSubClient::loop_read() {
case MQTTPINGREQ:
{
if (_client->connected()) {
buffer[0] = MQTTPINGRESP;
buffer[1] = 0;
_client->write(buffer,2);
receive_buffer[0] = MQTTPINGRESP;
receive_buffer[1] = 0;
_client->write(receive_buffer,2);
}
break;
}
Expand All @@ -451,9 +452,9 @@ boolean PubSubClient::loop() {
_client->stop();
return false;
} else {
buffer[0] = MQTTPINGREQ;
buffer[1] = 0;
if (_client->write(buffer,2) != 0) {
receive_buffer[0] = MQTTPINGREQ;
receive_buffer[1] = 0;
if (_client->write(receive_buffer,2) != 0) {
lastOutActivity = t;
lastInActivity = t;
}
Expand Down Expand Up @@ -483,22 +484,22 @@ boolean PubSubClient::publish(const char* topic, const uint8_t* payload, size_t
// Too long
return false;
}
// Leave room in the buffer for header and variable length field
// Leave room in the send_buffer for header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(topic,this->buffer,length);
length = writeString(topic,this->send_buffer,length);

// Add payload
uint16_t i;
for (i=0;i<plength;i++) {
this->buffer[length++] = payload[i];
this->send_buffer[length++] = payload[i];
}

// Write the header
uint8_t header = MQTTPUBLISH;
if (retained) {
header |= 1;
}
return write(header,this->buffer,length-MQTT_MAX_HEADER_SIZE);
return write(header,this->send_buffer,length-MQTT_MAX_HEADER_SIZE);
}
return false;
}
Expand All @@ -510,10 +511,10 @@ boolean PubSubClient::publish_P(const char* topic, const char* payload, boolean
boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_t plength, boolean retained) {
uint8_t llen = 0;
uint8_t digit;
unsigned int rc = 0;
size_t rc = 0;
uint16_t tlen;
unsigned int pos = 0;
unsigned int i;
size_t pos = 0;
size_t i;
uint8_t header;
size_t len;
size_t expectedLength;
Expand All @@ -528,21 +529,21 @@ boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, size_
if (retained) {
header |= 1;
}
this->buffer[pos++] = header;
this->send_buffer[pos++] = header;
len = plength + 2 + tlen;
do {
digit = len & 127; //digit = len %128
len >>= 7; //len = len / 128
if (len > 0) {
digit |= 0x80;
}
this->buffer[pos++] = digit;
this->send_buffer[pos++] = digit;
llen++;
} while(len>0);

pos = writeString(topic,this->buffer,pos);
pos = writeString(topic,this->send_buffer,pos);

rc += _client->write(this->buffer,pos);
rc += _client->write(this->send_buffer,pos);

for (i=0;i<plength;i++) {
rc += _client->write((char)pgm_read_byte_near(payload + i));
Expand All @@ -559,13 +560,13 @@ boolean PubSubClient::beginPublish(const char* topic, size_t plength, boolean re
if (connected()) {
// Send the header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
length = writeString(topic,this->buffer,length);
length = writeString(topic,this->send_buffer,length);
uint8_t header = MQTTPUBLISH;
if (retained) {
header |= 1;
}
size_t hlen = buildHeader(header, this->buffer, plength+length-MQTT_MAX_HEADER_SIZE);
uint16_t rc = _client->write(this->buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
size_t hlen = buildHeader(header, this->send_buffer, plength+length-MQTT_MAX_HEADER_SIZE);
uint16_t rc = _client->write(this->send_buffer+(MQTT_MAX_HEADER_SIZE-hlen),length-(MQTT_MAX_HEADER_SIZE-hlen));
lastOutActivity = millis();
return (rc == (length-(MQTT_MAX_HEADER_SIZE-hlen)));
}
Expand Down Expand Up @@ -651,17 +652,17 @@ boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
return false;
}
if (connected()) {
// Leave room in the buffer for header and variable length field
// Leave room in the send_buffer for header and variable length field
uint16_t length = MQTT_MAX_HEADER_SIZE;
nextMsgId++;
if (nextMsgId == 0) {
nextMsgId = 1;
}
this->buffer[length++] = (nextMsgId >> 8);
this->buffer[length++] = (nextMsgId & 0xFF);
length = writeString((char*)topic, this->buffer,length);
this->buffer[length++] = qos;
return write(MQTTSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE);
this->send_buffer[length++] = (nextMsgId >> 8);
this->send_buffer[length++] = (nextMsgId & 0xFF);
length = writeString((char*)topic, this->send_buffer,length);
this->send_buffer[length++] = qos;
return write(MQTTSUBSCRIBE|MQTTQOS1,this->send_buffer,length-MQTT_MAX_HEADER_SIZE);
}
return false;
}
Expand All @@ -681,18 +682,18 @@ boolean PubSubClient::unsubscribe(const char* topic) {
if (nextMsgId == 0) {
nextMsgId = 1;
}
this->buffer[length++] = (nextMsgId >> 8);
this->buffer[length++] = (nextMsgId & 0xFF);
length = writeString(topic, this->buffer,length);
return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->buffer,length-MQTT_MAX_HEADER_SIZE);
this->send_buffer[length++] = (nextMsgId >> 8);
this->send_buffer[length++] = (nextMsgId & 0xFF);
length = writeString(topic, this->send_buffer,length);
return write(MQTTUNSUBSCRIBE|MQTTQOS1,this->send_buffer,length-MQTT_MAX_HEADER_SIZE);
}
return false;
}

void PubSubClient::disconnect() {
this->buffer[0] = MQTTDISCONNECT;
this->buffer[1] = 0;
_client->write(this->buffer,2);
this->send_buffer[0] = MQTTDISCONNECT;
this->send_buffer[1] = 0;
_client->write(this->send_buffer,2);
_state = MQTT_DISCONNECTED;
_client->flush();
_client->stop();
Expand Down Expand Up @@ -775,17 +776,24 @@ boolean PubSubClient::setBufferSize(uint16_t size) {
return false;
}
if (this->bufferSize == 0) {
this->buffer = (uint8_t*)malloc(size);
this->receive_buffer = (uint8_t*)malloc(size);
this->send_buffer = (uint8_t*)malloc(size);
} else {
uint8_t* newBuffer = (uint8_t*)realloc(this->buffer, size);
uint8_t* newBuffer = (uint8_t*)realloc(this->receive_buffer, size);
if (newBuffer != NULL) {
this->buffer = newBuffer;
this->receive_buffer = newBuffer;
} else {
return false;
}
newBuffer = (uint8_t*)realloc(this->send_buffer, size);
if (newBuffer != NULL) {
this->send_buffer = newBuffer;
} else {
return false;
}
}
this->bufferSize = size;
return (this->buffer != NULL);
return (this->receive_buffer != NULL) && (this->send_buffer != NULL);
}

uint16_t PubSubClient::getBufferSize() {
Expand Down
Loading

0 comments on commit 4039196

Please sign in to comment.