Kafka 消息可靠性

某团队在重要业务中使用了 Kafka 消息队列,结果出现了消息丢失的问题。

场景:用户支付成功后,系统需要发送"支付成功通知"。但在极端情况下,消息发送后,Kafka Broker 崩溃,消息丢失,导致用户没收到通知。

排查后发现:生产者配置使用了 acks=0,消息发送后不等待确认。

这就是 Kafka 消息可靠性配置不当导致的问题。

【架构权衡】 Kafka 的消息可靠性由多个环节共同保障:生产端确认机制、Broker 副本机制、消费端手动提交。需要根据业务需求在"可靠性"和"性能"之间做出权衡。


一、核心问题 🔴

1.1 可靠性配置

可靠性配置层级:

1. 生产端配置
   ├─ acks=0:不等确认(最快,最不安全)
   ├─ acks=1:Leader 确认(中等)
   └─ acks=all:所有 ISR 确认(最安全,较慢)

2. Broker 端配置
   ├─ replication.factor >= 3
   ├─ min.insync.replicas >= 2
   └─ unclean.leader.election.enable=false

3. 消费端配置
   ├─ enable.auto.commit=false
   └─ 手动提交 offset

1.2 Exactly-Once 语义

Kafka 的消息语义:

At-Most-Once(最多一次):
├─ 生产:异步发送,不重试
├─ 消费:先提交 offset,再处理消息
└─ 风险:消息可能丢失

At-Least-Once(至少一次):
├─ 生产:同步发送 + 重试
├─ 消费:先处理消息,再提交 offset
└─ 风险:消息可能重复

Exactly-Once(恰好一次):
├─ Kafka Streams:事务性输出
├─ 幂等生产者:防止重复发送
└─ 事务性消费:消费+生产原子性

1.3 幂等生产者

// 启用幂等生产者
spring:
  kafka:
    producer:
      acks: all
      enable-idempotence: true
      max-in-flight-per-connection: 5

// 幂等生产者保证:
// 1. 单会话内消息不重复
// 2. 单 Producer 实例保证
// 3. 需要配合 acks=all 使用

二、生产避坑

2.1 消息丢失场景

// ❌ 场景1:acks=0
kafkaTemplate.send("topic", message); // 不等待确认

// ✅ 场景1:acks=all
ListenableFuture<SendResult<String, String>> future =
    kafkaTemplate.send("topic", message);
future.get(10, TimeUnit.SECONDS); // 同步等待

// ❌ 场景2:acks=1,Leader 崩溃
// acks=1 时,只有 Leader 确认,Follower 可能没同步

// ✅ 场景2:acks=all + min.insync.replicas=2
// 确保至少 2 个副本确认,包括 Leader

2.2 消息重复场景

// 幂等消费者:使用唯一键去重
@Service
public class IdempotentConsumer {

    @Autowired
    private IdempotentRepository repository;

    @KafkaListener(topics = "order-topic")
    public void consume(String message) {
        OrderMessage orderMessage = JSON.parseObject(message, OrderMessage.class);

        // 幂等去重
        if (repository.existsByOrderId(orderMessage.getOrderId())) {
            log.info("订单已处理,跳过: {}", orderMessage.getOrderId());
            return;
        }

        // 处理订单
        processOrder(orderMessage);

        // 标记已处理
        repository.save(orderMessage.getOrderId());
    }
}

三、可靠性配置清单

spring:
  kafka:
    # 生产端
    producer:
      acks: all                    # 所有 ISR 确认
      retries: 3                 # 重试次数
      enable-idempotence: true     # 幂等生产
      max-in-flight-per-connection: 5

    # Broker 端
    bootstrap-servers: broker1:9092,broker2:9092,broker3:9092

    # 消费端
    consumer:
      enable-auto-commit: false    # 手动提交
      auto-offset-reset: earliest  # 最早未消费消息
      isolation-level: read_committed  # 读取已提交消息

四、落地 Checklist

  • 可靠性评估:明确业务对消息可靠性的要求
  • 生产端配置:acks=all + 幂等生产者
  • Broker 配置:replication.factor >= 3
  • 消费端配置:手动提交 offset
  • 幂等设计:消费端幂等去重
  • 监控部署:监控消息 Lag、丢失率