#RocketMQ 事务消息
某团队在实现分布式事务时,选择了 RocketMQ 的事务消息方案。
核心流程:
- 发送半消息到 MQ(对消费者不可见)
- 执行本地事务
- 根据本地事务结果,提交或回滚半消息
- 如果 RocketMQ 长时间未收到提交/回滚指令,主动回查本地事务状态
【架构权衡】 RocketMQ 事务消息是本地消息表方案的一种进化:将"消息表"的管理交给了 MQ 本身,减少了业务代码的复杂度。但它仍然需要本地事务和消息发送的配合,本质上仍然是"最终一致"而非"强一致"。
#一、核心问题 🔴
#1.1 事务消息的原理
┌─────────────────────────────────────────────────────────────────┐
│ RocketMQ 事务消息流程 │
│ │
│ 生产者 │
│ │ │
│ ├──► 发送半消息(对消费者不可见)──► MQ ──► 半消息存储 │
│ │ │
│ ├──► 执行本地事务 ──► 数据库 │
│ │ │
│ ├──► 提交/回滚半消息 ──► MQ │
│ │ │
│ └──► 回查(如果长时间未收到结果)──► 查询本地事务状态 │
│ │
│ MQ broker │
│ │ │
│ ├──► 半消息:对消费者不可见 │
│ ├──► 收到提交:标记消息为可投递 │
│ └──► 收到回滚:删除消息 │
│ │
│ 消费者 │
│ │ │
│ └──► 消费消息 ──► 处理业务 ──► ACK │
└─────────────────────────────────────────────────────────────────┘#1.2 事务消息代码实现
// 事务监听器
@Service
public class OrderTransactionListener implements TransactionListener {
@Autowired
private OrderService orderService;
@Autowired
private OrderMapper orderMapper;
// 执行本地事务(半消息发送后调用)
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String body = new String(msg.getBody());
OrderMessage orderMessage = JSON.parseObject(body, OrderMessage.class);
try {
// 执行本地事务
Order order = new Order();
order.setUserId(orderMessage.getUserId());
order.setAmount(orderMessage.getAmount());
order.setStatus("CREATED");
orderMapper.insert(order);
// 本地事务成功,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败,回滚消息
log.error("本地事务执行失败", e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 回查(长时间未收到提交/回滚指令时调用)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String body = new String(msg.getBody());
OrderMessage orderMessage = JSON.parseObject(body, OrderMessage.class);
// 查询本地事务状态
Order order = orderMapper.findByUserId(orderMessage.getUserId());
if (order != null) {
// 本地事务已成功
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 本地事务未执行(或失败了)
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
// 发送事务消息
@Service
public class OrderService {
@Autowired
private TransactionMQProducer producer;
public void createOrder(OrderMessage orderMessage) {
Message msg = new Message("order-topic", JSON.toJSONString(orderMessage).getBytes());
// 发送事务消息
// 第二个参数是 arg,会传递给 TransactionListener.executeLocalTransaction
producer.sendMessageInTransaction(msg, orderMessage);
}
}【架构权衡】 RocketMQ 事务消息的核心价值是减少了业务代码的复杂度:
- 本地消息表:业务需要管理 outbox 表、轮询发送
- RocketMQ 事务消息:只需要实现 TransactionListener,MQ 自动管理消息状态
但它本质上仍然是"最终一致"而非"强一致"。
#二、工程代价评估
| 维度 | 评估 |
|---|---|
| 组件依赖 | 中(需要 RocketMQ) |
| 实现复杂度 | 中 |
| 一致性 | 最终一致 |
| 延迟 | 低 |
| 可靠性 | 高(MQ 保障) |
#三、落地 Checklist
- TransactionListener 实现:实现 executeLocalTransaction 和 checkLocalTransaction
- 幂等设计:消费者端必须幂等
- 回查机制:实现回查接口
- 监控告警:监控事务消息的状态