diff --git a/mqttclient/RyanMqttClient.h b/mqttclient/RyanMqttClient.h index b35cf0d..9b2183c 100644 --- a/mqttclient/RyanMqttClient.h +++ b/mqttclient/RyanMqttClient.h @@ -92,26 +92,27 @@ extern "C" typedef struct { - uint8_t lwtFlag : 1; // 遗嘱标志位 - uint8_t destoryFlag : 1; // 销毁标志位 - uint16_t ackHandlerCount; // 等待ack的记录个数 - uint16_t packetId; // mqtt报文标识符,控制报文必须包含一个非零的 16 位报文标识符 - uint32_t eventFlag; // 事件标志位 - RyanMqttState_e clientState; // mqtt客户端的状态 - RyanList_t msgHandlerList; // 维护消息处理列表,这是mqtt协议必须实现的内容,所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息,或者设置了拦截器) - RyanList_t ackHandlerList; // 维护ack链表 - RyanList_t userAckHandlerList; // 用户接口的ack链表,会由mqtt线程移动到ack链表 - platformTimer_t keepaliveTimer; // 保活定时器 - platformTimer_t keepaliveDebounTimer; // 保活定时器消抖 - platformNetwork_t network; // 网络组件 - RyanMqttClientConfig_t config; // mqtt config - platformThread_t mqttThread; // mqtt线程 - platformMutex_t msgHandleLock; // msg链表锁 - platformMutex_t ackHandleLock; // ack链表锁 - platformMutex_t userAckHandleLock; // 用户接口的ack链表锁 - platformMutex_t sendBufLock; // 写缓冲区锁 - platformCritical_t criticalLock; // 临界区锁 - lwtOptions_t lwtOptions; // 遗嘱相关配置 + uint8_t lwtFlag : 1; // 遗嘱标志位 + uint8_t destoryFlag : 1; // 销毁标志位 + uint16_t ackHandlerCount; // 等待ack的记录个数 + uint16_t packetId; // mqtt报文标识符,控制报文必须包含一个非零的 16 位报文标识符 + uint32_t eventFlag; // 事件标志位 + RyanMqttState_e clientState; // mqtt客户端的状态 + RyanList_t msgHandlerList; // 维护消息处理列表,这是mqtt协议必须实现的内容,所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息,或者设置了拦截器) + RyanList_t ackHandlerList; // 维护ack链表 + RyanList_t userAckHandlerList; // 用户接口的ack链表,会由mqtt线程移动到ack链表 + platformTimer_t ackScanThrottleTimer; // ack链表检查节流定时器 + platformTimer_t keepaliveTimer; // 保活定时器 + platformTimer_t keepaliveThrottleTimer; // 保活检查节流定时器 + platformNetwork_t network; // 网络组件 + RyanMqttClientConfig_t config; // mqtt config + platformThread_t mqttThread; // mqtt线程 + platformMutex_t msgHandleLock; // msg链表锁 + platformMutex_t ackHandleLock; // ack链表锁 + platformMutex_t userAckHandleLock; // 用户接口的ack链表锁 + platformMutex_t sendBufLock; // 写缓冲区锁 + platformCritical_t criticalLock; // 临界区锁 + lwtOptions_t lwtOptions; // 遗嘱相关配置 } RyanMqttClient_t; /* extern variables-----------------------------------------------------------*/ diff --git a/mqttclient/RyanMqttThread.c b/mqttclient/RyanMqttThread.c index 62b1d9f..e9cc378 100644 --- a/mqttclient/RyanMqttThread.c +++ b/mqttclient/RyanMqttThread.c @@ -25,13 +25,14 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client) RyanMqttConnectStatus_e connectState = RyanMqttKeepaliveTimeout; RyanMqttError_e result = RyanMqttFailedError; int32_t packetLen = 0; + uint32_t timeRemain = 0; RyanMqttAssert(NULL != client); // mqtt没有连接就退出 if (RyanMqttConnectState != RyanMqttGetClientState(client)) return RyanMqttNotConnectError; - uint32_t timeRemain = platformTimerRemain(&client->keepaliveTimer); + timeRemain = platformTimerRemain(&client->keepaliveTimer); // 超过设置的 1.4 倍心跳周期 if (0 == timeRemain) @@ -46,8 +47,8 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client) else if (timeRemain < client->config.recvTimeout || timeRemain < 1000 * 0.5 * client->config.keepaliveTimeoutS) { - // 消抖时间内不发送心跳包 - if (platformTimerRemain(&client->keepaliveDebounTimer)) + // 节流时间内不发送心跳报文 + if (platformTimerRemain(&client->keepaliveThrottleTimer)) return RyanMqttSuccessError; platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁 @@ -62,7 +63,7 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client) }); platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁 - platformTimerCutdown(&client->keepaliveDebounTimer, 1500); // 启动心跳消抖定时器 + platformTimerCutdown(&client->keepaliveThrottleTimer, 1500); // 启动心跳检查节流定时器 } return RyanMqttSuccessError; @@ -557,6 +558,10 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFla if (RyanMqttConnectState != RyanMqttGetClientState(client)) return; + // 节流时间内不检查ack链表 + if (platformTimerRemain(&client->ackScanThrottleTimer)) + return; + platformMutexLock(client->config.userData, &client->ackHandleLock); RyanListForEachSafe(curr, next, &client->ackHandlerList) { @@ -623,8 +628,9 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFla } } } - platformMutexUnLock(client->config.userData, &client->ackHandleLock); + + platformTimerCutdown(&client->ackScanThrottleTimer, 1000); // 启动ack scan节流定时器 } /**