RocketMQ 事务消息原理

候选人小赵在蚂蚁金服面试中被问到:"RocketMQ的事务消息是怎么实现的?"

小赵说:"就是发送半消息,执行本地事务,然后提交或回滚。"

面试官点点头:"好,那第二阶段是什么?MQ怎么知道本地事务成功了还是失败了?"

小赵愣了一下:"呃...发送消息的时候顺便告诉MQ?"

面试官追问:"如果本地事务执行成功了,但MQ还不知道,这时候MQ挂了怎么办?"

小赵彻底答不上来了。

【面试官心理】

这道题是RocketMQ区别于Kafka的核心能力,也是生产环境分布式事务的实战难点。90%的候选人知道"半消息"这个词,但说不清"事务消息回查"这个第二阶段的完整流程——包括回查次数、间隔、边界条件。更不知道"半消息对Consumer不可见"的实现原理。能完整讲清楚三阶段流程和异常处理的,才是真正在生产环境用过事务消息的P6+。

一、核心问题:事务消息的三阶段流程 🔴

1.1 问题拆解

第一层:基本概念 面试官问:"RocketMQ的事务消息是什么?和普通消息有什么区别?" 候选人答:"事务消息可以保证本地事务和消息发送的原子性..." 考察点:基本概念

第二层:流程细节 面试官追问:"半消息是什么?对Consumer可见吗?执行完本地事务后怎么提交?" 候选人答:...(拉开点1) 考察点:流程细节

第三层:回查机制 面试官追问:"如果本地事务执行成功了,但提交消息的时候MQ挂了怎么办?MQ怎么知道?" 候选人答:...(核心拉开点) 考察点:回查机制

第四层:异常处理 面试官追问:"回查也有次数限制吗?如果15次回查都失败了怎么办?" 候选人答:...(P7区分点) 考察点:边界条件

1.2 错误示范

候选人原话 A:"事务消息就是发送消息和执行本地事务在一个事务里。"

问题诊断

  • 完全错误!MQ是独立的消息系统,不参与本地数据库事务
  • RocketMQ的事务消息是通过"半消息+回查"两阶段提交实现的
  • 说出这话说明根本没用过事务消息

候选人原话 B:"半消息就是还没发送的消息,等本地事务成功后再发送。"

问题诊断

  • 半消息不是"还没发送",而是"已发送但对Consumer不可见"
  • 半消息已经存在Broker上了,不是本地暂存
  • 概念混淆严重

候选人原话 C:"RocketMQ的事务消息可以保证消息一定送达,不会丢失。"

问题诊断

  • 事务消息不能保证消息"一定送达",只是保证"本地事务和消息发送的一致性"
  • 如果本地事务失败,半消息会被回滚(删除)
  • 如果MQ的回查机制也失败了(15次全失败),消息会进入死信队列
  • 过度承诺不可取

1.3 标准回答

事务消息的本质:两阶段提交 + 半消息机制

RocketMQ事务消息解决的核心问题是:如何保证本地数据库操作和消息发送的一致性?

传统方案的痛点:
  方案1:先发消息,再执行本地事务
    → 消息发出去了,但本地事务失败了
    → 消息已投递,下游消费了,但本地状态不对
    → 数据不一致!

  方案2:先执行本地事务,再发消息
    → 本地事务成功了,但发消息时MQ挂了
    → 本地状态对了,但消息没发出去
    → 下游不知道业务已发生!
    → 数据不一致!

RocketMQ的解决方案:两阶段提交
  第一阶段:发送半消息(对Consumer不可见)
  第二阶段:执行本地事务,根据结果提交或回滚半消息
  第三阶段(补偿):如果MQ不知道结果,主动回查

三阶段详解

阶段1:发送半消息(Half Message)

Producer:
  transactionMQProducer.sendMessageInTransaction(msg, "ORDER_TAG", arg);

Broker:
  收到半消息 → 写入CommitLog
  → 写入TransCoefficientTopic_XX(事务队列)
  → 标记为"pending"状态
  → 返回发送成功给Producer

Consumer视角:
  订阅该Topic的Consumer看不到这条消息
  因为ConsumerQueue里没有这条消息的记录
  半消息对Consumer完全不可见

阶段2:执行本地事务(Local Transaction)

Broker:
  向Producer的TransactionListener发回调请求
  listener.executeLocalTransaction(msg, arg):
    → 执行本地数据库操作(扣库存、转账等)
    → 返回COMMIT(提交)或ROLLBACK(回滚)

Producer:
  根据本地事务结果:
    COMMIT → 发提交请求给Broker
    ROLLBACK → 发回滚请求给Broker

Broker:
  收到COMMIT → 删除半消息(标记为已删除)
             → 将原消息写入目标Topic
             → Consumer可见
  收到ROLLBACK → 删除半消息
               → Consumer永远看不到这条消息
阶段3:事务回查(Transaction Check)

异常场景:
  1. 本地事务执行成功了
  2. 发COMMIT请求给Broker时网络超时
  3. Broker没收到COMMIT,半消息一直是pending状态

MQ的处理:
  Broker定时扫描pending状态的半消息
  → 主动回调Producer的checkLocalTransaction()
  → Producer查询本地事务状态
  → 决定COMMIT还是ROLLBACK

回查机制参数:
  transactionTimeout = 6s   # 事务超时时间
  transactionCheckMax = 15  # 最大回查次数
  transactionCheckInterval = 1000 # 回查间隔(递增:1s, 2s, 4s...)

【面试官心理】

这道题我考察的是对两阶段提交完整流程的理解。说"发送半消息、执行本地事务、提交"只是表层描述,关键是要理解"半消息对Consumer不可见"的实现原理(半消息不写入ConsumerQueue),以及"回查机制"的必要性——网络故障时MQ无法知道本地事务结果,必须主动查询。能说出回查间隔递增设计的,说明认真看过源码。

1.4 追问升级

P6/P7 差距拉开点:

面试官问:"RocketMQ事务消息的半消息存在哪里?会不会占用大量磁盘空间?"

这道题的分水岭:

  • P5:不知道半消息存在哪里
  • P6:知道半消息存在TransCoefficientTopic,但说不清回收机制
  • P7:能说出半消息会被定时清理(超过transactionTimeout仍未COMMIT/ROLLBACK的会被ROLLBACK),能分析磁盘空间占用的边界

二、延伸问题:事务消息的完整代码实现 🟡

2.1 TransactionListener 的实现

// 事务消息监听器
public class OrderTransactionListener implements TransactionListener {

    // 阶段2:执行本地事务(这个方法在发送半消息后由Broker回调)
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String orderId = msg.getKeys();
        Order order = (Order) arg;

        try {
            // 1. 创建订单(本地事务)
            orderService.createOrder(order);

            // 2. 扣减库存
            inventoryService.deduct(order.getItems());

            // 3. 扣减余额
            accountService.deduct(order.getUserId(), order.getAmount());

            // 全部成功,提交事务消息
            return LocalTransactionState.COMMIT_MESSAGE;

        } catch (InsufficientBalanceException e) {
            // 余额不足,回滚订单和库存
            orderService.cancelOrder(orderId);
            inventoryService.restore(order.getItems());
            return LocalTransactionState.ROLLBACK_MESSAGE;

        } catch (Exception e) {
            // 其他异常,先返回UNKNOWN,让MQ回查
            // 如果是临时故障(如数据库连接超时),回查后可能成功
            return LocalTransactionState.UNKNOW;
        }
    }

    // 阶段3:回查本地事务状态(MQ主动调用)
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String orderId = msg.getKeys();

        try {
            // 查询本地数据库,确认订单状态
            Order order = orderService.getOrderById(orderId);

            if (order == null) {
                // 订单不存在,说明本地事务失败了(被回滚了)
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }

            if ("PAID".equals(order.getStatus())) {
                // 订单已支付,说明本地事务成功了
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            if ("PENDING".equals(order.getStatus())) {
                // 订单还在处理中,可能是临时状态,让MQ继续等待
                return LocalTransactionState.UNKNOW;
            }

            // 其他状态,保守处理,回滚
            return LocalTransactionState.ROLLBACK_MESSAGE;

        } catch (Exception e) {
            // 查询失败,返回UNKNOWN,下次再查
            return LocalTransactionState.UNKNOW;
        }
    }
}
⚠️

executeLocalTransaction 中不要做过多操作,更不要在这里调用外部服务。这个方法在 MQ 的回调线程中执行,耗时会增加事务消息的总延迟。如果需要调用外部服务,建议在本地事务中只操作数据库,其他逻辑放到异步处理。

2.2 生产者配置与使用

// 创建事务消息Producer
TransactionMQProducer producer = new TransactionMQProducer("order-tx-producer");
producer.setNamesrvAddr("name1:9876;name2:9876");
producer.setTransactionListener(new OrderTransactionListener());
producer.start();

// 发送事务消息
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setUserId("user123");
order.setAmount(100.0);

Message msg = new Message("order-topic", "order-tag", order.getId().getBytes(),
    new ObjectMapper().writeValueAsBytes(order));

// 发送时会先发半消息,然后回调executeLocalTransaction
SendResult sendResult = producer.sendMessageInTransaction(msg, order);

if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
    System.out.println("事务消息发送成功,状态:" + sendResult.getTransactionId());
} else {
    System.out.println("半消息发送失败");
}

三、生产避坑:事务消息的常见问题

3.1 坑一:本地事务成功但半消息发送失败

场景:本地事务执行成功了,但半消息发送时网络超时。

结果:本地数据库有数据,但MQ里没有半消息,下游永远不知道这笔订单。

解决方案

方案1:半消息发送失败,本地事务回滚
  → RocketMQ事务消息的sendMessageInTransaction
  → 如果半消息发送失败,会抛出异常
  → 在catch中回滚本地事务
  → 但这引入了新问题:如果回滚失败怎么办?

方案2:幂等 + 补偿(推荐)
  → 半消息发送失败,不回滚本地事务
  → 本地事务保持"pending"状态
  → 启动定时任务扫描"pending"的订单
  → 主动补发半消息
  → RocketMQ 4.3+ 支持

3.2 坑二:回查次数耗尽导致消息卡死

场景:本地事务一直是UNKNOWN状态(比如数据库锁等待),MQ回查15次后最终回滚。

问题

  • 订单本地已创建,但MQ认为失败了
  • 下游收不到消息,但本地数据已存在
  • 数据不一致!

解决方案

// 回查时,如果订单正在处理中,返回COMMIT
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    Order order = orderService.getOrderById(msg.getKeys());

    if ("PROCESSING".equals(order.getStatus())) {
        // 还在处理中,说明本地事务最终会成功
        // 返回COMMIT,让消息投递给下游
        // 下游消费时再做最终校验
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    if ("SUCCESS".equals(order.getStatus())) {
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    return LocalTransactionState.ROLLBACK_MESSAGE;
}
💡

事务消息的最佳实践:本地事务只做"核心操作"(如扣款),不做"非核心操作"(如发通知)。如果本地事务最终失败,回查后MQ回滚半消息,消息不会投递给下游。非核心操作由下游Consumer自己保证幂等。

3.3 坑三:事务消息和普通消息混用导致消费顺序错乱

场景:同一订单既发事务消息又发普通消息,消费顺序错乱。

问题

  • 事务消息的投递时间不确定(取决于回查完成时间)
  • 普通消息立即投递
  • 可能导致"通知先到,订单后到"的情况

解决方案

  • 同一业务链路的全部消息都用事务消息
  • 或者在同一事务内同时发送所有相关消息

四、工程选型:事务消息的适用场景

4.1 事务消息的适用判断

场景是否适用原因
订单创建 + 库存扣减适用需要本地事务原子性
订单创建 + 积分发放适用需要本地事务原子性
用户注册 + 欢迎邮件不适用注册失败不影响发邮件
支付成功 + 回调通知不适用支付成功后才能通知

4.2 事务消息 vs 其他分布式事务方案

方案一致性复杂度延迟适用场景
RocketMQ事务消息最终一致MQ链路的事务
Seata AT模式强一致跨库事务
Seata TCC模式强一致高性能跨服务
本地消息表最终一致无MQ事务支持的场景

【面试官心理】

面试这道题,我会追问一个实际场景:"如果本地事务执行成功了,但下游Consumer处理失败了怎么办?"能说出"事务消息只能保证本地事务和消息发送的一致性,不能保证下游处理成功"、"需要Consumer端幂等 + 重试机制"的,说明真正理解了这个方案的能力边界。只知道背流程、不理解边界的,是典型的P5。