Skip to content

Commit

Permalink
perf: ack链表处理函数添加节流措施
Browse files Browse the repository at this point in the history
  • Loading branch information
Ryan-CW-Code committed Sep 25, 2024
1 parent 8ba20e6 commit 53d3452
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 25 deletions.
41 changes: 21 additions & 20 deletions mqttclient/RyanMqttClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,26 +92,27 @@ extern "C"

typedef struct
{
uint8_t lwtFlag : 1; // 遗嘱标志位
uint8_t destoryFlag : 1; // 销毁标志位
uint16_t ackHandlerCount; // 等待ack的记录个数
uint16_t packetId; // mqtt报文标识符,控制报文必须包含一个非零的 16 位报文标识符
uint32_t eventFlag; // 事件标志位
RyanMqttState_e clientState; // mqtt客户端的状态
RyanList_t msgHandlerList; // 维护消息处理列表,这是mqtt协议必须实现的内容,所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息,或者设置了拦截器)
RyanList_t ackHandlerList; // 维护ack链表
RyanList_t userAckHandlerList; // 用户接口的ack链表,会由mqtt线程移动到ack链表
platformTimer_t keepaliveTimer; // 保活定时器
platformTimer_t keepaliveDebounTimer; // 保活定时器消抖
platformNetwork_t network; // 网络组件
RyanMqttClientConfig_t config; // mqtt config
platformThread_t mqttThread; // mqtt线程
platformMutex_t msgHandleLock; // msg链表锁
platformMutex_t ackHandleLock; // ack链表锁
platformMutex_t userAckHandleLock; // 用户接口的ack链表锁
platformMutex_t sendBufLock; // 写缓冲区锁
platformCritical_t criticalLock; // 临界区锁
lwtOptions_t lwtOptions; // 遗嘱相关配置
uint8_t lwtFlag : 1; // 遗嘱标志位
uint8_t destoryFlag : 1; // 销毁标志位
uint16_t ackHandlerCount; // 等待ack的记录个数
uint16_t packetId; // mqtt报文标识符,控制报文必须包含一个非零的 16 位报文标识符
uint32_t eventFlag; // 事件标志位
RyanMqttState_e clientState; // mqtt客户端的状态
RyanList_t msgHandlerList; // 维护消息处理列表,这是mqtt协议必须实现的内容,所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息,或者设置了拦截器)
RyanList_t ackHandlerList; // 维护ack链表
RyanList_t userAckHandlerList; // 用户接口的ack链表,会由mqtt线程移动到ack链表
platformTimer_t ackScanThrottleTimer; // ack链表检查节流定时器
platformTimer_t keepaliveTimer; // 保活定时器
platformTimer_t keepaliveThrottleTimer; // 保活检查节流定时器
platformNetwork_t network; // 网络组件
RyanMqttClientConfig_t config; // mqtt config
platformThread_t mqttThread; // mqtt线程
platformMutex_t msgHandleLock; // msg链表锁
platformMutex_t ackHandleLock; // ack链表锁
platformMutex_t userAckHandleLock; // 用户接口的ack链表锁
platformMutex_t sendBufLock; // 写缓冲区锁
platformCritical_t criticalLock; // 临界区锁
lwtOptions_t lwtOptions; // 遗嘱相关配置
} RyanMqttClient_t;

/* extern variables-----------------------------------------------------------*/
Expand Down
16 changes: 11 additions & 5 deletions mqttclient/RyanMqttThread.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
RyanMqttConnectStatus_e connectState = RyanMqttKeepaliveTimeout;
RyanMqttError_e result = RyanMqttFailedError;
int32_t packetLen = 0;
uint32_t timeRemain = 0;
RyanMqttAssert(NULL != client);

// mqtt没有连接就退出
if (RyanMqttConnectState != RyanMqttGetClientState(client))
return RyanMqttNotConnectError;

uint32_t timeRemain = platformTimerRemain(&client->keepaliveTimer);
timeRemain = platformTimerRemain(&client->keepaliveTimer);

// 超过设置的 1.4 倍心跳周期
if (0 == timeRemain)
Expand All @@ -46,8 +47,8 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
else if (timeRemain < client->config.recvTimeout ||
timeRemain < 1000 * 0.5 * client->config.keepaliveTimeoutS)
{
// 消抖时间内不发送心跳包
if (platformTimerRemain(&client->keepaliveDebounTimer))
// 节流时间内不发送心跳报文
if (platformTimerRemain(&client->keepaliveThrottleTimer))
return RyanMqttSuccessError;

platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
Expand All @@ -62,7 +63,7 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
});
platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁

platformTimerCutdown(&client->keepaliveDebounTimer, 1500); // 启动心跳消抖定时器
platformTimerCutdown(&client->keepaliveThrottleTimer, 1500); // 启动心跳检查节流定时器
}

return RyanMqttSuccessError;
Expand Down Expand Up @@ -557,6 +558,10 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFla
if (RyanMqttConnectState != RyanMqttGetClientState(client))
return;

// 节流时间内不检查ack链表
if (platformTimerRemain(&client->ackScanThrottleTimer))
return;

platformMutexLock(client->config.userData, &client->ackHandleLock);
RyanListForEachSafe(curr, next, &client->ackHandlerList)
{
Expand Down Expand Up @@ -623,8 +628,9 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFla
}
}
}

platformMutexUnLock(client->config.userData, &client->ackHandleLock);

platformTimerCutdown(&client->ackScanThrottleTimer, 1000); // 启动ack scan节流定时器
}

/**
Expand Down

0 comments on commit 53d3452

Please sign in to comment.