MQ的消费在很多场景下是需要使用异步方式来实现,这样系统可以并发处理多条消息,从而显著提升系统的吞吐量。
异步处理带来的问题:
- 消息接收成功,不代表消息处理成功; 甚至消息只是存储在本地队列中,根本没有执行。系统异常或是应用重启,都可能导致消息的丢失。
- 很多因素影响异步处理的性能,需要全面的评估消息异步处理的并发量。
异步消费的思路是:大事务 = 小事务 + 异步;将大事务拆分成多个小事务异步执行。这样基本上能够将跨机事务的执行效率优化到与单机一致。
具体做法是:
-
MQ消费者接收消息,使用数据库中存储消息并进行消息的去重(消息状态:init=新消息), 然后把消息放入异步队列,最后回复ACK到MQ;此阶段是保证消息接收成功,并去除重复消息。
-
任务开始执行的时候,更新执行次数和任务状态为(running=消费中)。消息接收成功到消息开始执行之间的时间为T1, 此时间用于超时重试。
-
任务执行完成,记录任务状态(success=消费成功、failure=消费失败)。消息开始执行到执行完成之间的时间为T2, 此时间同样用于超时重试。
-
开定时任务,从数据库获取需要重新执行的消息。重新执行的消息有: T1超时,T2超时,执行失败。
-
加载数据的时候,注意多实例防止重复操作数据,防止并发问题,加上乐观锁。
需要注意:
对于大多系统 T1,T2,重试次数的可静态配置;可添加策略,通过消息获取对应的T1,T2和重试次数。
异步处理在客户端可能会有消息堆积的情况,需要合理的设置并发量;此数据需要使用实际的压力测试来获取。
- 消息表
字段 | 注释 |
---|---|
id | 主键 |
name | 消息主题Topic |
status | 消息状态 |
message_id | 提取的消息ID |
raw_message | 序列化为字符串的消息 |
retry | 重试次数 |
update_time | 更新时间 |
version | 乐观锁 |
- 消息状态变更表
状态编码 | 名称 |
---|---|
init | 初始化 |
running | 运行中 |
success | 执行成功 |
failure | 执行失败 |
retry | 重试 |