本地消息表

某团队在实现分布式事务时,没有使用 Seata 或 MQ,而是选择了"本地消息表"方案:

在每个服务的本地数据库中,增加一张"消息表"。本地业务操作和消息插入在同一事务中完成。消息表通过定时任务轮询,发送到 MQ,消费者收到后更新消息状态。

【架构权衡】 本地消息表是最轻量级的分布式事务方案,不需要引入额外组件(MQ、TCC 框架等),只需要数据库即可实现。但它的缺点是:消息发送是"非原子性"的——本地事务成功,消息发送可能失败;或者消息发送成功,但本地事务被回滚了。本地消息表通过"消息状态"和"定时轮询"解决了这个问题。


一、核心问题 🔴

1.1 方案原理

本地消息表的核心思想:

┌─────────────────────────────────────────────────────────────────┐
│                        本地事务 + 消息表                          │
│                                                                  │
│  数据库:                                                        │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                     业务表                                │  │
│  │  order (id, user_id, amount, status, ...)               │  │
│  └──────────────────────────────────────────────────────────┘  │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │                     消息表                                │  │
│  │  outbox (id, business_id, message_type, status, ...)   │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                  │
│  事务:                                                          │
│  BEGIN TRANSACTION                                             │
│    INSERT INTO order (...) VALUES (...);  -- 业务操作          │
│    INSERT INTO outbox (...) VALUES (...); -- 消息记录          │
│  COMMIT TRANSACTION                                            │
│                                                                  │
│  特点:                                                          │
│  ├─ 业务操作和消息记录在同一事务中                              │
│  ├─ 要么都成功,要么都回滚                                      │
│  └─ 解决了"本地成功、消息发送失败"的问题                        │
└─────────────────────────────────────────────────────────────────┘

1.2 消息发送流程

消息发送流程:

1. 业务操作 + 消息记录(同一事务)
   └─ order: INSERT ..., outbox: INSERT (status=PENDING)

2. 定时任务轮询 outbox 表
   └─ SELECT * FROM outbox WHERE status=PENDING LIMIT 100

3. 发送消息到 MQ
   └─ 消息发送成功

4. 更新消息状态
   └─ UPDATE outbox SET status=SENT WHERE id=?

5. 消费者收到消息,处理下游业务
   └─ consumer.process()

6. 消费者确认
   └─ ACK

二、方案实现

2.1 数据库表设计

-- 消息表
CREATE TABLE outbox (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    business_id VARCHAR(64) NOT NULL COMMENT '业务ID',
    message_type VARCHAR(64) NOT NULL COMMENT '消息类型',
    payload TEXT NOT NULL COMMENT '消息内容(JSON)',
    status VARCHAR(16) NOT NULL DEFAULT 'PENDING' COMMENT '状态:PENDING/SENT/FAILED',
    retry_count INT DEFAULT 0 COMMENT '重试次数',
    create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
    update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_status_create (status, create_time),
    INDEX idx_business_id (business_id)
) COMMENT '本地消息表';

2.2 业务代码实现

@Service
public class OrderServiceWithOutbox {

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private OutboxRepository outboxRepository;

    @Transactional
    public void createOrder(Order order) {
        // 1. 执行业务操作
        order.setStatus(OrderStatus.CREATED);
        orderRepository.save(order);

        // 2. 记录消息到 outbox 表(同一事务)
        OutboxMessage message = new OutboxMessage();
        message.setBusinessId(order.getId().toString());
        message.setMessageType("ORDER_CREATED");
        message.setPayload(JSON.toJSONString(order));
        message.setStatus("PENDING");
        outboxRepository.save(message);

        // 事务提交后,order 和 outbox 都持久化了
    }
}

2.3 消息发送器

@Service
public class OutboxMessageSender {

    @Autowired
    private OutboxRepository outboxRepository;

    @Autowired
    private MessageProducer messageProducer;

    @Scheduled(fixedDelay = 1000) // 每秒轮询一次
    public void sendPendingMessages() {
        // 1. 获取待发送消息
        List<OutboxMessage> pendingMessages = outboxRepository
            .findByStatusWithLock("PENDING", 100);

        for (OutboxMessage message : pendingMessages) {
            try {
                // 2. 发送消息到 MQ
                messageProducer.send(message.getMessageType(),
                    message.getBusinessId(),
                    message.getPayload());

                // 3. 更新状态为 SENT
                message.setStatus("SENT");
                outboxRepository.update(message);

            } catch (Exception e) {
                // 4. 发送失败,重试计数
                message.setRetryCount(message.getRetryCount() + 1);
                if (message.getRetryCount() >= 3) {
                    message.setStatus("FAILED");
                }
                outboxRepository.update(message);
                log.error("消息发送失败,messageId={}", message.getId(), e);
            }
        }
    }
}

【架构权衡】 本地消息表的核心优势是轻量级:不需要引入额外的分布式事务组件,只需要数据库即可。但它的缺点是:

  1. 消息发送有延迟(轮询间隔)
  2. 需要额外的 outbox 表
  3. 需要定时任务轮询

三、工程代价评估

维度评估
组件依赖低(只需要数据库)
实现复杂度
一致性最终一致
延迟中(轮询间隔)
可靠性高(消息状态跟踪)

四、落地 Checklist

  • 表设计:设计 outbox 表结构
  • 事务保证:业务操作和消息记录必须在同一事务
  • 轮询实现:实现定时轮询发送
  • 重试机制:处理消息发送失败
  • 监控告警:监控 PENDING 消息堆积
  • 补偿机制:处理长期未发送的消息