《RocketMQ实战与进阶》08 结合实际场景顺序消费、消息过滤实战

张开发
2026/5/3 2:23:38 15 分钟阅读
《RocketMQ实战与进阶》08 结合实际场景顺序消费、消息过滤实战
顺序消费业务场景描述现在开发一个银行类项目对用户的每一笔余额变更都需要发送短信通知到用户。如果用户同时在电商平台下单转账两个渠道在同一时间进行了余额变更此时用户收到的短信必须顺序的例如先网上购物消费了 128余额 1000再转账给朋友 200剩余余额 800如果这两条短信的发送顺序颠倒给用户会带来很大的困扰故在该场景下必须保证顺序。这里所谓的顺序是针对同一个账号的不同的账号无需保证顺序性例如用户 A 的余额发送变更用户 B 的余额发生变更这两条短信的发送其实相互不干扰的故不同账号之间无需保证顺序。代码实现本篇代码主要采用截图的方式展示其关键代码并对其进行一些简单的解读。首先这里的主业务是操作账户的余额然后是余额变更后需要发短信通知给用户但由于发送短信与账户转载是两个相对独立但又紧密的操作故这里可以引入消息中间件来解耦这两个操作。但由于发送短信业务其顺序一定要与扣款的顺序保证一致故需要使用顺序消费。由于 RocketMQ 只提供了消息队列的局部有序故如果要实现某一类消息的顺序执行就必须将这类消息发送到同一个队列故这里在消息发送时使用了 MessageQueueSelector并且使用用户账户进行队列负载这样同一个账户的消息就会账号余额变更的顺序到达队列然后队列中的消息就能被顺序消费。顺序消费的事件监听器为 MessageListenerOrderly表示顺序消费。顺序消费在使用上比较简单那 RocketMQ 顺序消费是如何实现的队列重新负载时还能保持顺序消费吗顺序消费会重复消费吗RocketMQ 顺序消费原理简述在 RocketMQ 中PUSH 模式的消息拉取模型如下图所示上述流程在前面的章节中已做了详述这里不再累述这里想重点突出线程池。RocketMQ 消息消费端按照消费组进行的线程隔离即每一个消费组都会创建已线程池由一个线程池负责分配的所有队列中的消息。**所以要保证消费端对单队列中的消息顺序处理故多线程处理需要按照消息消费队列进行加锁。**故顺序消费在消费端的并发度并不取决消费端线程池的大小而是取决于分给给消费者的队列数量故如果一个 Topic 是用在顺序消费场景中建议消费者的队列数设置增多可以适当为非顺序消费的 2~3 倍这样有利于提高消费端的并发度方便横向扩容。消费端的横向扩容或 Broker 端队列个数的变更都会触发消息消费队列的重新负载在并发消息时在队列负载的时候一个消费队列有可能被多个消费者同时消息但顺序消费时并不会出现这种情况因为顺序消息不仅仅在消费消息时会锁定消息消费队列在分配到消息队列时能从该队列拉取消息还需要在 Broker 端申请该消费队列的锁即同一个时间只有一个消费者能拉取该队列中的消息确保顺序消费的语义。从前面的文章中也介绍到并发消费模式在消费失败是有重试机制默认重试 16 次而且重试时是先将消息发送到 Broker然后再次拉取到消息这种机制就会丧失其消费的顺序性故如果是顺序消费模式消息重试时在消费端不停的重试重试次数为 Integer.MAX_VALUE即如果一条消息如果一直不能消费成功其消息消费进度就会一直无法向前推进即会造成消息积压现象。温馨提示顺序消息时一定要捕捉异常必须能区分是系统异常还是业务异常更加准确的要能区分哪些异常是通过重试能恢复的哪些是通过重试无法恢复的。无法恢复的一定要尽量在发送到 MQ 之前就要拦截并且需要提高告警功能。消息过滤实战业务场景描述例如公司采用的是微服务架构分为如下几个子系统基础数据、订单模块、商家模块各个模块的数据库都是独立的。微服务带来的架构伸缩性不容质疑但数据库的相互独立对于基础数据的 join 操作就不那么方便了即在订单模块需要使用基础数据还需要通过 Dubbo 服务的方式去请求接口为了避免接口的调用基础数据的数据量又不是特别多的情况项目组更倾向于将基础数据的数据同步到各个业务模块的数据库然后基础数据发生变化及时通知订单模块这样与基础数据的表 join 操作就可以在本库完成。技术方案上述方案的关键思路基础数据一旦数据发生变化就向 MQ 的 base_data_topic 发送一条消息。下游系统例如订单模块、商家模块订阅 base_data_topic 完成数据的同步。问题如果订单模块出现一些不可预知的错误导致数据同步出现异常并且发现的时候存储在 MQ 中的消息已经被删除此时需要上游基础数据重推数据这个时候如果基础数据重推的消息直接发送到 base_data_topic那该 Topic 的所有消费者都会消费到这显然是不合适的。怎么解决呢通常有两种办法为各个子模块创建另外一个主题例如 retry_ods_base_data_topic这样需要向哪个子系统就向哪个 Topic 发送。引入 Tag 机制。本节主要来介绍一下 Tag 的思路。首先正常情况下基础模块将数据变更发送到 base_data_topic并且消息的 Tag 为 all。然后为每一个子系统定义一个单独的重推 Tag例如 ods、shop。消费端同时订阅 all 和各自的重推 Tag完美解决问题。代码实现在消息发送时需要按照需求指定消息的 Tag其示例代码如下然后在消息消费时订阅时更加各自的模块订阅各自关注的 Tag其示例代码如下在消息订阅时一个消费组可以订阅多个 Tag多个 Tag 使用双竖线分隔。Topic 与 Tag 之争用 Tag 对同一个主题进行区分会引来一个“副作用”就是在重置消息消费位点时该消费组需要“处理”的是所有标签的数据虽然在 Broker 端、消息消费端最终会过滤不符合 Tag 的消息并不会执行业务逻辑但在消息拉取时还是需要将消息读取到 PageCache 中并进行过滤会有一定的性能损耗但这个不是什么大问题。在数据推送这个场景除了使用 Tag 机制来区分重推数据外也可以为重推的数据再申请一个额外的主题即通过主题来区分不同的数据这种方案倒不说不可以但这个在运维管理层面需要申请众多的 Topic而这类 Topic 存储的其实是一类数据使用不同的 Topic 存储同类数据会显得较为松散。当然如果是不同的业务场景就建议使用 Topic 来隔离。小结本篇主要从两个贴近实战场景结合场景来介绍如何使用顺序消息、消息过滤所有的示例代码整合在一个 Spring Boot Dubbo RocketMQ MyBatis 的工程中。

更多文章