博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ事务消息实战
阅读量:5911 次
发布时间:2019-06-19

本文共 3049 字,大约阅读时间需要 10 分钟。

我们以一个订单流转流程来举例,例如订单子系统创建订单,需要将订单数据下发到其他子系统(与第三方系统对接)这个场景,我们通常会将两个系统进行解耦,不直接使用服务调用的方式进行交互。其业务实现步骤通常为:

   1、A系统创建订单并入库。
   2、发送消息到MQ。
   3、MQ消费者消费消息,发送远程RPC服务调用,完成订单数据的同步。
   1、方案一

   方案弊端:

   1、如果消息发送成功,在提交事务的时候JVM突然挂掉,事务没有成功提交,导致两个系统之间数据不一致。
   2、由于消息是在事务提交之前提交,发送的消息内容是订单实体的内容,会造成在消费端进行消费时如果需要去验证订单是否存在时可能出现订单不存在。
   3、消息发送可以考虑异步发送。
   方案二:
   由于存在上述问题,在MQ不支持事务消息的前提条件下,可以采用下面的方式进行优化。

   然后在控制器层,使用异步发送,将消息发送,并在消息发送成功后,更新待发送状态为已发送。

   然后通过定时任务,扫描待发送,结合创建时间的记录(小于当前时间5分钟的消息待发送记录),进行消息发送。
   方案弊端:
   1、消息有可能重复发送,但在消费端可以通过唯一业务编号来进行去重设计。
   2、实现过于复杂,为了避免 极端情况下的消息丢失,需要使用定时任务。
   方案三:基于RocketMQ4.3版本事务消息

   额外需要实现事务会查监听器:TransactionListener,其实例代码:

import org.apache.rocketmq.client.producer.LocalTransactionState;import org.apache.rocketmq.client.producer.TransactionListener;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageExt;import java.util.concurrent.ConcurrentHashMap;@SuppressWarnings("unused")public class OrderTransactionListenerImpl implements TransactionListener {        private ConcurrentHashMap
countHashMap = new ConcurrentHashMap<>(); private final static int MAX_COUNT = 5; @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // String bizUniNo = msg.getUserProperty("bizUniNo"); // 从消息中获取业务唯一ID。 // 将bizUniNo入库,表名:t_message_transaction,表结构 bizUniNo(主键),业务类型。 return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = 0; // 从数据库查查询t_message_transaction表,如果该表中存在记录,则提交, String bizUniNo = msg.getUserProperty("bizUniNo"); // 从消息中获取业务唯一ID。 // 然后t_message_transaction 表,是否存在bizUniNo,如果存在,则返回COMMIT_MESSAGE, // 不存在,则记录查询次数,未超过次数,返回UNKNOW,超过次数,返回ROLLBACK_MESSAGE if(query(bizUniNo) > 0 ) { return LocalTransactionState.ROLLBACK_MESSAGE; } return rollBackOrUnown(bizUniNo); } public int query(String bizUniNo) { return 1; //select count(1) from t_message_transaction a where a.biz_uni_no=#{bizUniNo} } public LocalTransactionState rollBackOrUnown(String bizUniNo) { Integer num = countHashMap.get(bizUniNo); if(num != null && ++num > MAX_COUNT) { countHashMap.remove(bizUniNo); return LocalTransactionState.ROLLBACK_MESSAGE; } if(num == null) { num = new Integer(1); } countHashMap.put(bizUniNo, num); return LocalTransactionState.UNKNOW; } }

 TransactionListener 实现要点:

   executeLocalTransaction:
   该方法,主要是设置本地事务状态,该方法与业务方代码在一个事务中,例如OrderServer#createMap中,只要本地事务提交成功,该方法也会提交成功。
   故在这里,主要是t_message_transaction添加一条记录,在事务会查时,如果存在记录,就认为是该消息需要提交。
   checkLocalTransaction:
   该方法主要是告知RocketMQ消息是否需要提交还是回滚,如果本地事务表(t_message_transaction)存在记录,则认为提交,如果不存在,可以设置会查次数,如果指定次数内还是未查到消息,则回滚,否则返回未知,rocketmq会按一定的频率回查事务,当然回查次数也有限制,默认为5次,可配置。

转载于:https://www.cnblogs.com/bootdo/p/10518134.html

你可能感兴趣的文章
如何写出小而清晰的函数?(JS 版)
查看>>
解析:深度学习框架Caffe源码
查看>>
TCL华星光电与华显光电重组:疑为旗下公司上市铺路
查看>>
国家风光储输示范工程:破解新能源世界难题
查看>>
解读《森林防火视频监控系统技术规范》
查看>>
为什么美国科技巨头纷纷押注非洲?
查看>>
IT战略投资创收下滑7% 联想筹钱加码PC核心
查看>>
昆明视频监控建设行之有效 明年底实现百分百覆盖
查看>>
Gartner AI商业观察:2021年行业解决方案30%营收净增长来自AI
查看>>
AT&T宣布16亿美元收购一小型无线创业公司
查看>>
高德布局大数据生态 向智慧城市延伸
查看>>
Android原生Email模块使用的数据文件分析
查看>>
MPP分布式数据库性能评估方法 - 阿里云HybridDB for PostgreSQL最佳实践
查看>>
数“聚”天下 盐城开启产业先导新时代
查看>>
通讯应用Kik推出聊天机器人商店
查看>>
“云上贵州”大赛完整诠释大数据三大业态
查看>>
中兴国际热度暴增幕后:令美国人担心的跨国品牌
查看>>
看知名企业CIO如何吐槽ERP
查看>>
OTT:全球联网设备超过80亿部,流量增速惊人
查看>>
安防监控小型企业 应对困境的出路
查看>>