From 9b05ba020ec3b9f05858fe1df0ba4f03c12b9ebd Mon Sep 17 00:00:00 2001 From: jdpayne1969 Date: Mon, 18 Jul 2016 14:50:04 -0700 Subject: [PATCH 1/3] Fixed bug where [clientid] was not being replaced with topic name. --- rsmb/src/MQTTSProtocol.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rsmb/src/MQTTSProtocol.c b/rsmb/src/MQTTSProtocol.c index 049dc68..f832bd3 100644 --- a/rsmb/src/MQTTSProtocol.c +++ b/rsmb/src/MQTTSProtocol.c @@ -972,9 +972,7 @@ int MQTTSProtocol_handleSubscribes(void* pack, int sock, char* clientAddr, Clien else if (sub->flags.topicIdType == MQTTS_TOPIC_TYPE_PREDEFINED && client != NULL && sub->topicId != 0) { char *predefinedTopicName = MQTTSProtocol_getPreDefinedTopicName(client, sub->topicId) ; - // copy the topic name as it will be freed by subscription engine - topicName = malloc(strlen(predefinedTopicName)+1); - strcpy(topicName, predefinedTopicName); + topicName = MQTTSProtocol_replaceTopicNamePlaceholders(client, predefinedTopicName); topicId = sub->topicId; } From 67697703fbf908f53eb41aaf57c3dba9425f5118 Mon Sep 17 00:00:00 2001 From: jdpayne1969 Date: Thu, 10 Nov 2016 09:40:21 -0800 Subject: [PATCH 2/3] Fixed publish of retained messages, missing prefix Retained messages that where published before a bridge is established where published to the bridge without the defined prefix. --- rsmb/src/MQTTProtocol.c | 60 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 53 insertions(+), 7 deletions(-) diff --git a/rsmb/src/MQTTProtocol.c b/rsmb/src/MQTTProtocol.c index c939a3f..7b53b0e 100644 --- a/rsmb/src/MQTTProtocol.c +++ b/rsmb/src/MQTTProtocol.c @@ -876,6 +876,10 @@ int MQTTProtocol_handleDisconnects(void* pack, int sock, Clients* client) */ void MQTTProtocol_processRetaineds(Clients* client, char* topic, int qos, int priority) { + // NEW + Messages* stored = NULL; /* to avoid duplication of data where possible */ + int clean_needed = 0; + // NEW List* rpl = NULL; ListElement* currp = NULL; #if defined(QOS0_SEND_LIMIT) @@ -887,27 +891,69 @@ void MQTTProtocol_processRetaineds(Clients* client, char* topic, int qos, int pr while (ListNextElement(rpl, &currp)) { int curqos; - Publish publish; + Publish lPublish; Messages* p = NULL; RetainedPublications* rp = (RetainedPublications*)(currp->content); - publish.payload = rp->payload; - publish.payloadlen = rp->payloadlen; - publish.topic = rp->topicName; + lPublish.payload = rp->payload; + lPublish.payloadlen = rp->payloadlen; + lPublish.topic = rp->topicName; curqos = (rp->qos < qos) ? rp->qos : qos; #if defined(QOS0_SEND_LIMIT) if (curqos == 0) ++qos0count; if (qos0count > bstate->max_inflight_messages) /* a somewhat arbitrary criterion */ { - if (MQTTProtocol_queuePublish(client, &publish, curqos, 1, priority, &p) == SOCKET_ERROR) + if (MQTTProtocol_queuePublish(client, &lPublish, curqos, 1, priority, &p) == SOCKET_ERROR) break; } else { #endif - if (Protocol_startOrQueuePublish(client, &publish, curqos, 1, priority, &p) == SOCKET_ERROR) - break; + // NEW + if (client) + { + Publish *publish = &lPublish; +// Clients* pubclient = (Clients*)(curnode->content); + Clients* pubclient = client; + int retained = 0; + Messages* saved = NULL; + char* original_topic = publish->topic; + +#if !defined(NO_BRIDGE) + if (pubclient->outbound || pubclient->noLocal) + { + retained = publish->header.bits.retain; /* outbound and noLocal mean outward/inward bridge client, + so keep retained flag */ + if (pubclient->outbound) + { + Bridge_handleOutbound(pubclient, publish); + if (publish->topic != original_topic) + { /* handleOutbound has changed the topic, so we musn't used the stored pub which + contains the original topic */ + saved = stored; + stored = NULL; + } + } + } +#endif + if (Protocol_startOrQueuePublish(pubclient, publish, qos, retained, priority, &stored) == SOCKET_ERROR) + { + pubclient->good = pubclient->connected = 0; /* flag this client as needing to be cleaned up */ + clean_needed = 1; + } + if (publish->topic != original_topic) + { + stored = saved; /* restore the stored pointer for the next loop iteration */ + free(publish->topic); + publish->topic = original_topic; + } + } + // NEW + + // OLD + //if (Protocol_startOrQueuePublish(client, &publish, curqos, 1, priority, &p) == SOCKET_ERROR) + // break; #if defined(QOS0_SEND_LIMIT) } #endif From 36bd1aef6c6f72e56a880dbaba799985a9ebc9dd Mon Sep 17 00:00:00 2001 From: jdpayne1969 Date: Fri, 3 Mar 2017 11:22:18 -0800 Subject: [PATCH 3/3] Fixed publish bug where messages are larger than 255 --- rsmb/src/MQTTSPacket.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/rsmb/src/MQTTSPacket.c b/rsmb/src/MQTTSPacket.c index a3ee308..2665eba 100644 --- a/rsmb/src/MQTTSPacket.c +++ b/rsmb/src/MQTTSPacket.c @@ -193,7 +193,7 @@ void* MQTTSPacket_Factory(int sock, char** clientAddr, struct sockaddr* from, ui data = MQTTSPacket_parse_header( &header, data ) ; /* In case of Forwarder Encapsulation packet, Length: 1-octet long, specifies the number of octets up to the end - * of the “Wireless Node Id” field (incl. the Length octet itself). Length does not include length of payload + * of the �Wireless Node Id� field (incl. the Length octet itself). Length does not include length of payload * (encapsulated MQTT-SN message itself). */ if (header.type != MQTTS_FRWDENCAP && header.len != n) @@ -240,7 +240,7 @@ char* MQTTSPacket_parse_header( MQTTSHeader* header, char* data ) { /* The Length field is either 1- or 3-octet long and specifies the total number of octets contained in * the message (including the Length field itself). - * If the first octet of the Length field is coded “0x01” then the Length field is 3-octet long; in this + * If the first octet of the Length field is coded �0x01� then the Length field is 3-octet long; in this * case, the two following octets specify the total number of octets of the message (most-significant * octet first). Otherwise, the * Length field is only 1-octet long and specifies itself the total number of octets contained in the @@ -448,9 +448,11 @@ void* MQTTSPacket_publish(MQTTSHeader header, char* data) char* enddata = &data[header.len - 2]; int topicLen = 0; int datalen = 0; + int headerlen; FUNC_ENTRY; //printf("publish header.len %d\n", header.len); + headerlen = (header.len > 255) ? 9 : 7; pack = malloc(sizeof(MQTTS_Publish)); pack->header = header; pack->flags.all = readChar(&curdata); @@ -477,13 +479,13 @@ void* MQTTSPacket_publish(MQTTSHeader header, char* data) pack->msgId = readInt(&curdata); if (pack->flags.topicIdType == MQTTS_TOPIC_TYPE_NORMAL && pack->flags.QoS == 3) { - datalen = header.len - 7 - topicLen; + datalen = header.len - headerlen - topicLen; memcpy(pack->shortTopic, curdata, topicLen); pack->shortTopic[topicLen] = '\0'; curdata += topicLen; } else - datalen = header.len - 7; + datalen = header.len - headerlen; pack->data = malloc(datalen); memcpy(pack->data, curdata, datalen); pack->dataLen = datalen;