RocketMQ 事务消息

某团队在实现分布式事务时,选择了 RocketMQ 的事务消息方案。

核心流程:

  1. 发送半消息到 MQ(对消费者不可见)
  2. 执行本地事务
  3. 根据本地事务结果,提交或回滚半消息
  4. 如果 RocketMQ 长时间未收到提交/回滚指令,主动回查本地事务状态

【架构权衡】 RocketMQ 事务消息是本地消息表方案的一种进化:将"消息表"的管理交给了 MQ 本身,减少了业务代码的复杂度。但它仍然需要本地事务和消息发送的配合,本质上仍然是"最终一致"而非"强一致"。


一、核心问题 🔴

1.1 事务消息的原理

┌─────────────────────────────────────────────────────────────────┐
│                      RocketMQ 事务消息流程                        │
│                                                                  │
│  生产者                                                           │
│     │                                                              │
│     ├──► 发送半消息(对消费者不可见)──► MQ ──► 半消息存储         │
│     │                                                              │
│     ├──► 执行本地事务 ──► 数据库                                   │
│     │                                                              │
│     ├──► 提交/回滚半消息 ──► MQ                                   │
│     │                                                              │
│     └──► 回查(如果长时间未收到结果)──► 查询本地事务状态         │
│                                                                  │
│  MQ broker                                                        │
│     │                                                              │
│     ├──► 半消息:对消费者不可见                                    │
│     ├──► 收到提交:标记消息为可投递                               │
│     └──► 收到回滚:删除消息                                       │
│                                                                  │
│  消费者                                                           │
│     │                                                              │
│     └──► 消费消息 ──► 处理业务 ──► ACK                             │
└─────────────────────────────────────────────────────────────────┘

1.2 事务消息代码实现

// 事务监听器
@Service
public class OrderTransactionListener implements TransactionListener {

    @Autowired
    private OrderService orderService;

    @Autowired
    private OrderMapper orderMapper;

    // 执行本地事务(半消息发送后调用)
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String body = new String(msg.getBody());
        OrderMessage orderMessage = JSON.parseObject(body, OrderMessage.class);

        try {
            // 执行本地事务
            Order order = new Order();
            order.setUserId(orderMessage.getUserId());
            order.setAmount(orderMessage.getAmount());
            order.setStatus("CREATED");
            orderMapper.insert(order);

            // 本地事务成功,提交消息
            return LocalTransactionState.COMMIT_MESSAGE;

        } catch (Exception e) {
            // 本地事务失败,回滚消息
            log.error("本地事务执行失败", e);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    // 回查(长时间未收到提交/回滚指令时调用)
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String body = new String(msg.getBody());
        OrderMessage orderMessage = JSON.parseObject(body, OrderMessage.class);

        // 查询本地事务状态
        Order order = orderMapper.findByUserId(orderMessage.getUserId());
        if (order != null) {
            // 本地事务已成功
            return LocalTransactionState.COMMIT_MESSAGE;
        } else {
            // 本地事务未执行(或失败了)
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

// 发送事务消息
@Service
public class OrderService {

    @Autowired
    private TransactionMQProducer producer;

    public void createOrder(OrderMessage orderMessage) {
        Message msg = new Message("order-topic", JSON.toJSONString(orderMessage).getBytes());

        // 发送事务消息
        // 第二个参数是 arg,会传递给 TransactionListener.executeLocalTransaction
        producer.sendMessageInTransaction(msg, orderMessage);
    }
}

【架构权衡】 RocketMQ 事务消息的核心价值是减少了业务代码的复杂度

  • 本地消息表:业务需要管理 outbox 表、轮询发送
  • RocketMQ 事务消息:只需要实现 TransactionListener,MQ 自动管理消息状态

但它本质上仍然是"最终一致"而非"强一致"。

二、工程代价评估

维度评估
组件依赖中(需要 RocketMQ)
实现复杂度
一致性最终一致
延迟
可靠性高(MQ 保障)

三、落地 Checklist

  • TransactionListener 实现:实现 executeLocalTransaction 和 checkLocalTransaction
  • 幂等设计:消费者端必须幂等
  • 回查机制:实现回查接口
  • 监控告警:监控事务消息的状态