• -------------------------------------------------------------
  • ====================================

利用Rocketmq4.2版来实现分布式事务

kafka dewbay 6年前 (2019-04-12) 2433次浏览 已收录 0个评论 扫描二维码

花了点时间学了RocketMQ,下面是本人的一点点心得,如果觉的写的好就点个赞,但如果你要借鉴话,我还是劝你看下面参考资料里的视频(作者为阿里牛人),虽然他分享的视频是为了推销阿里云的 DRDS、ONS(RocketMQ阿里版),只是讲了个大概,没有细说,但是指明一个大的方向,让人非常的受益。

借用阿里牛人视频中的 ppt:来说明单机事务拆分成分布式事务分解思想(具体自己看视频)。

图中左边的事务 3 与事务 5 为远程调用的网络事务。

上图为消息发送者端通过消息集群完整的事务流程,ONS 消息集群这里用 rockMQ 集群代替。

RocketMQ在 V3.1.5 开始,使用 数据库 实现【事务状态】的存储。但未开源,因为 rocketmq 阉割了对生产者的 LocalTransactionState 状态的回查机制,所以增加了生产端事务的复杂度。本来由RocketMQ中间件通过回查机制来让生产者知道事务信息发送成功,现在要生产者自己来确认。(后面有详情讲解事务消息同时成功或失败的情况)

如下是来自官方文档的说明,可能是阉割了对生产者的 LocalTransactionState 状态的回查机制的一部分原因:
RocketMQ 这种实现事务方式,没有通过 KV 存储做,而是通过 Offset 方式,存在一个显著缺陷,即通过 Offset 更改数据,会令系统的脏页过多,需要特别关注,

更新:RocketMQ新版本又开源了内置回查机制,详情查看 http://rocketmq.apache.org/docs/simple-example/

我们先看一下消息发送者端成功执行事务并发送确认消息后通过 RocketMQ 的控制台用 key 值查看成功执行事务中传递的消息:在 RocketMQ3.2.6 版本中是一条消息,在 RocketMQ4.2 版本中发现有两条消息。虽然这条消息是 mq 集群在成功接收生产者提交发送的 COMMIT_MESSGE 消息(通过 oneway 方式)后在消息集群本地产生,没有增加生产者发送消息的网络延时时间,但这种实现方式也是有代价的,事务消息增加一倍(虽然是在 MQ 集群本机拷贝,但增加了集群的 IO 压力),官方这样改应该是益大于弊吧。

下面是按 key 值查询事务成功提交 COMMIT_MESSGE 消息后的返回信息:

QueryResult
[indexLastUpdateTimestamp=1516002830440,
messageList=[

  1. MessageExt [queueId=1, storeSize=291, queueOffset=0, sysFlag=4, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830323, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000246, commitLogOffset=582, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=0, toString()=
    Message

[topic=pay, flag=0, properties=
{KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag},
body=67]

], MessageExt [queueId=1, storeSize=291, queueOffset=0, sysFlag=8, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830440, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000369, commitLogOffset=873, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=582, toString()=
Message

[topic=pay, flag=0, properties=
{KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag},
body=67]

]
]
]
sysFlag=4, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830323, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000246, commitLogOffset=582, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=0, toString()=
Message

[topic=pay, flag=0, properties=
{KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag},
body=67]

], MessageExt [queueId=1, storeSize=291, queueOffset=0, sysFlag=8, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830440, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000369, commitLogOffset=873, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=582, toString()=
Message

[topic=pay, flag=0, properties=
{KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag},
body=67]

]
]
]
共返回两条消息:两条消息中大部分数据是一样的,但 sysFlag、storeTimestamp、msgId、commitLogOffset、preparedTransactionOffset 字段是不一样的:其中第 1 条为 prepared 发送的消息,第 2 条只有在提交 COMMIT_MESSGE 消息成功后产生。

注意 sysFlag、preparedTransactionOffset 字段与 prepared 消息的区别,当提交 COMMIT_MESSGE 消息成功后,推测 MQ 集群做了如下动作:1. 读取 prepared 消息,修改 sysFlag、preparedTransactionOffset 值,2. 在存入 commitlog 日志文件,设置 consumerqueue 序列;因为当作一条新的消息处理,所以 toreTimestamp、msgId、commitLogOffset 字段自然也就变了。所以按照发送的 prepared 消息的返回结果显示的 msgId 查看 sysFlag 状态只是 prepared 消息的 sysFlag 状态,RocketMQ4.2 版本的话要用 key 值去查询,才能查看事务提交成功的消息标志 sysFlag=8。

下面是按 key 值查询事务失败提交 ROLLBACK_MESSAGE 消息后的返回信息:

QueryResult
[indexLastUpdateTimestamp=1516002830440,
messageList=[

  1. MessageExt [queueId=1, storeSize=291, queueOffset=0, sysFlag=4, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830323, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000246, commitLogOffset=582, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=0, toString()=
    Message

[topic=pay, flag=0, properties=
{KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag},
body=67]

]
]
]
如果发送 ROLLBACK_MESSAGE 消息,在控制台只会查到一条 prepared 消息,MQ 集群对 prepared 消息不作任何处理。

上图为消息接收者端通过消息集群完整的事务流程。(后面有详情讲解消息超时与重复的问题)


分布式事务流程看下面的流程图:

分发布式事务通过消息中间件解耦为相互独立的(本地事务+异步)而本地事务间消息传递统一由消息中间件负责

一、消费者集群事务:

  1. 在执行本地事务时要注意:本地事务要尽量保证幂等性(如 s*s = s,也就是事务不管执行多少次结果都一样),如不能保证幂等性,要在业务上去对消息消费的去重(在消费者集群添加去重表,在事务开始前校验此消息是否重复,在事务提交前插入相关数据。去重表具体参见生产者集群事务的回查表,可以省略 count 字段)。因为 RocketMQ 不保证信息不重复,虽然重复几率很小。
  2. 对于消费者集群执行本地事务失败的情况,阿里提供给我们的解决方法是:人工解决。按照事务的流程,因为某种原因事务失败,那么需要回滚整个流程。如果消息系统要实现这个回滚流程的话,系统复杂度将大大提升,且很容易出现 Bug,估计出现 Bug 的概率会比消费失败的概率大很多。这也是 RocketMQ 目前暂时没有解决这个问题的原因,在设计实现消息系统时,我们需要衡量是否值得花这么大的代价来解决这样一个出现概率非常小的问题,这也是大家在解决疑难问题时需要多多思考的地方。
  3. 对于阿里来说,一个分布式事务可以能涉及的是几百个子系统,对于他们来说处理分布式事务回滚代价太大;但对于的分布式事务只涉及几个子系统,回滚不太复杂的情况下,我想是否可以对回滚分布式事务可以用一个反向的分布式事务解决,代价就是在消费端本地事务处理失败,回滚本地事务后发送一条分布式事务失败消息给生产者。而生产者需要额外为分布式事务设计相对应的回滚分布式事务的接口。视频中的阿里牛人说阿里分布式事务中消费失败的几率很小,在他印象中一两年才出现一次,这样的几率是否值得我们去做一个回滚分布式事务的设计呢?

二、生产者集群的事务:

因为 rocketmq 阉割了对 LocalTransactionState 状态的回查机制,所以生产者必须确认 rocketMQ 集群是否收到 LocalTransactionState 状态;

更新:RocketMQ 新版本又开源了内置回查机制,这里的生产者的回查表不需要了。详情查看 http://rocketmq.apache.org/docs/simple-example/

这里只需要考虑本地事务执行成功后的情况(因为本地事务失败不管确认消息发送成功与失败 MQ 集群都不会再发送消息到消费者):

1. 本地事务成功后宕机,确认消息没有发出,分布式事务只执行一半。

  1. 确认消息 COMMIT_MESSGE 发出,但因网络不可达 RocketMQ 集群没收到。
  2. 确认消息 COMMIT_MESSGE 发出,RocketMQ 集群收到 COMMIT_MESSGE 消息,但 rocketmq 取消了回查机制,
    生产者还是不知道 COMMIT_MESSGE 发出是否成功。

上面三种情况的本质是一样的,就是生产者本地事务成功后,COMMIT_MESSGE 消息是否送达 rocketmq 集群;所以可以看做同一种情况.

解决方案流程图:

官方在 rocketmq 集群上使用了数据库来实现回查机制,那我也学官方用数据库来实现回查机制,只是我把回查机制放在了生产者集群上。

一、在执行本地事务 commit 前向回查表插入消息的 KEY 值。

二、在生产者集群上设置一个定时任务(根据自身分布式事务流程执行的时间设定)。

      1. 从回查表获取 CONFIRM 为 0 的记录列表,从记录列表中获取 COUNT 为 3 的记录,当 count 列达到指定阀值(假定是 3)时:
此时记录的 COUNT 为 3,如果 CONFIRM 还是为 0,那么说明对此事务的回查次数为 3,但 RocketMQ 集群还未收到 COMMIT_MESSAGE 消息,说明发送 COMMIT_MESSAGE 消息失败,但本地事务已经执行成功,那么必须要重发与此条记录中 KEY 值相对应的 Perpared 消息的确认消息。根据 KEY 值向 MQ 集群查找消息,根据获取的消息重新用同步的方式发送此条消息到 MQ 集群,并更新此记录的 CONFIRM 为 1,COUNT+1
      2. 根据第 1 步获取的记录列表,取出 CONFIRM 为 0 且 COUNT 小于 3 的记录,根据 KEY 值向 MQ 集群查找消息。
      3. 根据第 2 步获取的消息判断是否是 sysFlag 为 8 的消息;如果是,更新回查表对应 KEY 记录的 CONFIRM 为 1,COUNT 为 count+1,如果不是,更新回查表对应 KEY 记录的 COUNT 为 count+1。

参考资料:

强烈建议看他的视频:总共有十个视频,都是关于分布式事务与 RocketMQ。

https://www.jianshu.com/p/453c6e7ff81c 根据上面视频而写的。

https://segmentfault.com/a/1190000009512510 事务源码

作者:要懂得舍得
来源:CSDN
原文:https://blog.csdn.net/zyw23zyw23/article/details/79070044
版权声明:本文为博主原创文章,转载请附上博文链接!


露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:利用Rocketmq4.2版来实现分布式事务
喜欢 (0)
[]
分享 (0)
关于作者:
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址