本地事件表加消息队列实现分布式事务思路

本地事件表加消息队列实现分布式事件方案实现数据的最终一致性。本地事件表作用是为了事件溯源(Event-Sourcing),消息队列实现事件通知。

事件表要求记录了业务表操作的所有事件,所有事件的组合就是表中数据的生命周期。

本篇是 分布式微服务应用系列(九):分布式事务概念及解决方案 的延续。

本地事件表

本地事件表记录了操作对象时涉及分布式事务的每个步骤,作用是用于事件溯源(Event-Sourcing ),实现回滚操作。

一般应用在领域对象模型中。在一个 对象创建销毁 的整个生命周期中,会产生大量的事件(Event),每个事件都会有自己的 事件类型(Event Type)。

事件 包含时间、事件类型、模型等信息,事件类型可以是枚举的。可以将 事件 理解为某一个具体动作,如例,在某一时刻创建一个订单 则是一个事件,而 创建 是一种事件类型。

事件记录

在将对象最终状态记录到数据库时,还要记录对象所发生的一系列事件变更,这些事件需要按时间先后顺序记录到数据库中。也就是说,数据库中不仅包含了对象所属性的模型表,还包含了对象所产生事件的事件表。

当创建、修改、删除一条模型对象时(查询一般不考虑),不修需要个修改模型对象表中的数据,还需要记录一条对应的事件到事件表中,这个过程称为 事件记录

每个模型都有一个唯一ID,称为 Model ID。事件对象也同样有一个唯一的 ID,称为 Event ID,在事件表中关联了 Model ID,这样可以在事件表中 找到 Model ID,这一过程称为 事件溯源

事件对象

事件记录过程中,先操作模型表,再操作事件表,这两个操作是在同一个事务中;而在事件溯源过程中,是先操作事件表,再操作模型表,这两个操作是在同一个事务中。

事件表字段与事件模型对象属性对应,结构比较固定,也可根据实际需要灵活设计,但大体包括以下基础字段:

  • id:具有唯一性的事件 ID。
  • type:事件类型,例如,create,update,delete 等。
  • process:事件进行到的环节,如新建,已发布,已处理。
  • content:事件内容,通常是需要传递的JSON格式数据。
  • modelId:对应的模型对象 ID,具有唯一性。
  • createdTime:事件的创建时间,一般存时间戳,精确到毫秒。
  • updateTime:事件修改时间,一般存时间戳,精确到毫秒。。

方案实现思路

最终一致性分布式事务简略图

创建对象

创建用于封装事件的类和表,及事件处理器。

  • Event:封装事件相关字段,与 event 表映射。
  • EventType:枚举类定义事件类型。
  • EventHandler:事件处理器,封装事件相关操作。
  • Foo:Foo 对象模型(实体类),与 Foo Table 映射。
  • Bar:Bar 对象模型(实体类),与 Bar Table 表映射。

实现思路

  1. 第一步:Foo Service 操作 Foo 模型表与 event 事件表,并将事件写入消息队列

    1. 在 Foo Service 中插入一条 Foo 对象到 Foo Table 中
    2. 同时创建一条名 Create Foo 的事件(Event 对象)
    3. 将 Event 对象插入到 Event Table 中,与插入 Foo 对象在同一个事务中提交。
    4. 在事条提交后,将 Event 对象发送到 MQ 的 foo-success-queue 消息队列中。
  2. 第二步:Bar Service 从消息队列中获取事件,操作 Bar 模型表,若有异常,则将源事件发送到 foo-fail-queue 队列中。

    1. 从 foo-success-queue 中获取 Event 对象。

    2. 将 Bar 对象插入模型表中。

    3. 若 JDBC 操作出现异常。

    4. 则在 catch 中将 Event 对象写入 foo-fail-queue 队列,并抛出异常让事务回滚。

      注意:如果使用的是 RabbitMQ,则在 catch 中必须抛出 Spring AMQP 框架提供的 AmqpRejectAndDontRequeueException 异常(集成 Spring AMQP 框架),否则即使事务回滚,还会重新进入 foo-success-queue 队列,重复处理相同的消息,导致此过程不断循环,直到超出 RabbitMQ 队列所限制的 Reuque 次数,这是不希望看到的。

      当抛出 AmqpRejectAndDontRequeueException 时,Event 对象将不再进入 foo-success-queue 队列,而是进入 DLQ (Dead Letter Queue,死信队列),进入的消息将被丢弃永不被消费,或人为执行额外处理。

  3. 第三步:Foo Service 从消息队列中获取事件,操作事件表与模型表(即 事件溯源 过程)

    1. 从 foo-fail-queue 中取回 Event 对象
    2. 根据 Event 中的 Modele ID 到 Event Table 中查出 Foo ID。
    3. 根据 Foo ID 到 Foo Table 中删除曾经创建的 Foo 对象。

注意事项

  1. INSERT 语句的逆向操作是 DELETE 语句,UPDATE 语句的逆向操作也是 UPDATE 语句,但对于 DELETE 语句,通常采用逻辑删除(更新删除标记字段),而不是物理删除,因此 DELETE 语句的逆向操作也是 UDPATE 语句。
  2. 在实际应用环境中,应尽量避免分布式事务,通过合理的切换微服务边界可解决大部分分布式事务问题。

实现补充

上面方案完全是基于 MQ 解耦的方式实现,即消息消费服务是完全独立的。

但在 Spring Cloud 或微服务环境,Foo Service 服务通常会直接调用 Bar Service 接口,就有可能出现接口调用失败,或 Bar 服务修改失败的情况,且对于这些失败希望人为可知,并实现一些补偿机制。

可以通过创建一个 可靠性消息服务消息发送服务 的应用来解决上述的问题,大致流程如下:

  1. Foo Service 进行修改操作。若成功则调用 可靠性消息服务 发送修改操作的信息;若失败,则回滚修改,不发送消息给可靠性消息服务。

  2. 可靠性消息服务收到信息之后存入消息表中,此服务器负责接口信息并存储,修改消息数据,不负责发送逻辑。

    封装信息实体类与的表结构映射,字段如下:

    • id:信息唯一ID。
    • message:消息内容,以 JSON 存储。
    • queue:消息队列名称。
    • send_system:发生消息的服务名。
    • send_count:重复发送消息次数。
    • create_time:创建时间
    • last_send_time:最后发送消息的时间。
    • status:消息状态:0-等待消费,1-已消费,2-已死亡。
    • die_count:死亡次数,由使用方决定,默认为发送 10 次还没被消费则标记死亡,人工介入。
    • consume_time:消费时间
    • consume_system:消费的服务名
    • die_time:死亡时间。
  3. 消息发送服务启动一个线程,在线程中一直循环从消息表中获取没有处理的信息,发送到消息队列;如果消息的发送次数已超过死亡的次数,则改成死亡信息不再进行处理。

    注意:消息发送服务可能会有多个实例,需要使用分布式锁,只让一个实例执行,若没有要处理的消息,则休眠一段时间,最后释放分布式锁。

  4. Bar Service 消费 MQ 中的消息,然后进行需要的修改操作,成功后调用可靠性消息服务的接口,进行消息已被正常消费的确认工作(更改消息状态相关数据)。当发生异常时,MQ 会重发消息,直到超过重发的次数。

  5. 另可开发消息管理系统,通过 Web 控制台管理消息,具体有以下功能点:

    • 查看消息列表。
    • 可以根据不同的状态查询消息。
    • 对死亡的消息进行重发操作。
    • 删除已被消费的消息。

最大努力通知型事务

最大努力通知型事务适用于跟外部系统之间的通讯,通过定期通知的方式来达到数据的一致性。

尽最大的努力通知对方,但无法保证一定能通知到,可以提供查询接口给对方查询。

例如,调用支付系统,需要一个回调地址,在支付成功后,支付系统将支付结果返回给回调地址,如果支付系统没有收到客户机返回的确认,支付系统会重复回调,直到通知 N 次后不再通知,同时提供支付结果查询接口给客户机调用。

相关参考

  1. 聊聊分布式事务
  2. 谈谈分布式事务
  3. 分布式事务系列文章1
  4. 分布式事务系列文章2
  5. 分布式事务的四种解决方案
作者

光星

发布于

2019-07-28

更新于

2022-07-12

许可协议

评论